我希望能够在 Polars 中处理非常大的文件而不会耗尽内存。在文档中,他们建议使用扫描、lazyframes 和接收器,但很难找到有关如何在实践中执行此操作的正确文档。希望这里的专家能帮忙。
这里我提供了一个适用于可以在内存中处理的“较小”文件的示例。
# Imports
import pandas as pd
import polars as pl
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow._hdfs import HadoopFileSystem
# Setting up HDFS file system
hdfs_filesystem = HDFSConnection('default')
hdfs_out_path_1 = "scanexample.parquet"
hdfs_out_path_2 = "scanexample2.parquet"
# Dataset
df = pd.DataFrame({
'A': np.arange(10000),
'B': np.arange(10000),
'C': np.arange(10000),
'D': np.arange(10000),
})
# Writing to Hadoop
pq_table = pa.Table.from_pandas(df)
pq_writer = pq.ParquetWriter(hdfs_out_path_1,
schema=pq_table.schema,
filesystem=hdfs_filesystem)
# Appending to parquet file
pq_writer.write_table(pq_table)
pq_writer.close()
# Read file
pq_df = pl.read_parquet(source=hdfs_out_path_1,
use_pyarrow=True,
pyarrow_options={"filesystem": hdfs_filesystem})
# Transforms and write
pq_df.filter(pl.col('A')>9000)\
.write_parquet(file = hdfs_out_path_2, use_pyarrow=True, pyarrow_options={"filesystem": hdfs_filesystem})
# Scanning file: Attempt 1
scan_df = pl.scan_parquet(source = hdfs_out_path_2)
ERROR: Cannot find file
# Scanning file: Attempt 2
scan_df = pl.scan_parquet(source = hdfs_filesystem.open_input_stream(hdfs_out_path_1))
ERROR: expected str, bytes or os.PathLike object, not pyarrow.lib.NativeFile
根据 polars 文档 scan_parquet 不采用 pyarrow 参数。但它谈到采取一些“存储选项”,我想这就是我需要使用的。但如何呢?
# Writing to parquet
df.to_parquet(path="testlocal.parquet")
# Read lazily
lazy_df = pl.scan_parquet(source="testlocal.parquet")
# Transforms and write
lazy_df.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
首先是坏消息,没有办法直接沉入文件系统。如果您想使用
sink_parquet
,您必须先沉入本地存储,然后进行复制。看起来这种情况很快就会随着此 PR 而改变,但这不在当前版本中。
scan_pyarrow_dataset
可能看起来像这样
import pyarrow.dataset as ds
pq_lf = pl.scan_pyarrow_dataset(
ds.dataset(hdfs_out_path_1,
filesystem= hdfs_filesystem
)
)
现在你有了一个懒惰的框架。这可能是最好的方法,因为您已经在使用 pyarrow.fs,它似乎独立于 fsspec,这就是 Polars 访问云文件的方式。
scan_parquet
的 storage_options 的另一个选项在底层使用 fsspec.open
。这种用法可能看起来像这样
pq_lf=pl.scan_parquet(f"hdfs://{hdfs_out_path_1}, storage_options=some_dict)
相对于您当前在
storage_options
中所做的事情,您的 hdfs_filesystem = HDFSConnection('default')
的外观将由您的 hdfs 提供商决定。
准下沉解决方法
你可以做这样的事情
schema=lazy_df.fetch(1).to_arrow().schema
with pq.ParquetWriter(hdfs_out_path_1,
schema=schema,
filesystem=hdfs_filesystem) as pq_writer:
for A_chunk in [(9000, 12000), (12000, 15000), (15000,20000)]:
pq_writer.write_table(
lazy_df
.filter(pl.col('A').is_between(A_chunk))
# I'd also put a sort here
.collect()
.to_arrow()
)
旁注:
当 Polars 写入 Parquet 文件时,即使是通过 pyarrow,它也会关闭统计信息,这将排除未来对行组的读取优化,因此如果您需要统计信息,您需要显式指定
statistics=True
。 pyarrow 默认使用 snappy 压缩,但 Polars 使用 zstd。我认为,在这一点上,pyarrow 的默认值只是长期以来默认的东西之一,他们无法更改它。我这样说是因为我尝试使用的所有东西似乎都支持 zstd,并且生成的文件比 snappy 更小,而不会真正牺牲性能,所以如果您使用 pyarrow writer,那么我会设置 compression='zstd'
。当您像上面一样使用 ParquetWriter 时,每次调用 write_table 都会生成一个不同的 row_group,因此您可以通过此处的分块策略来真正优化未来的读取,该策略与您稍后读取文件的方式一致(只要您打开统计信息) ).