Python Polars:低内存读取、处理、向 Hadoop 写入 parquet

问题描述 投票:0回答:1

我希望能够在 Polars 中处理非常大的文件而不会耗尽内存。在文档中,他们建议使用扫描、lazyframes 和接收器,但很难找到有关如何在实践中执行此操作的正确文档。希望这里的专家能帮忙。

这里我提供了一个适用于可以在内存中处理的“较小”文件的示例。

1.设置

# Imports
import pandas as pd
import polars as pl
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow._hdfs import HadoopFileSystem

# Setting up HDFS file system
hdfs_filesystem = HDFSConnection('default')
hdfs_out_path_1 = "scanexample.parquet"
hdfs_out_path_2 = "scanexample2.parquet"

2.创建数据

# Dataset
df = pd.DataFrame({
    'A': np.arange(10000),
    'B': np.arange(10000),
    'C': np.arange(10000),
    'D': np.arange(10000),
})

# Writing to Hadoop
pq_table = pa.Table.from_pandas(df)
pq_writer = pq.ParquetWriter(hdfs_out_path_1, 
                             schema=pq_table.schema, 
                             filesystem=hdfs_filesystem)

# Appending to parquet file
pq_writer.write_table(pq_table)
pq_writer.close()

3.将镶木地板读入极坐标数据帧(在内存中)

# Read file
pq_df = pl.read_parquet(source=hdfs_out_path_1, 
                        use_pyarrow=True, 
                        pyarrow_options={"filesystem": hdfs_filesystem})

4.进行转换并写入文件

 # Transforms and write
 pq_df.filter(pl.col('A')>9000)\
      .write_parquet(file = hdfs_out_path_2, use_pyarrow=True, pyarrow_options={"filesystem": hdfs_filesystem})

5.现在用低内存做同样的事情

# Scanning file: Attempt 1
scan_df = pl.scan_parquet(source = hdfs_out_path_2)
ERROR: Cannot find file

# Scanning file: Attempt 2
scan_df = pl.scan_parquet(source = hdfs_filesystem.open_input_stream(hdfs_out_path_1))
ERROR: expected str, bytes or os.PathLike object, not pyarrow.lib.NativeFile

根据 polars 文档 scan_parquet 不采用 pyarrow 参数。但它谈到采取一些“存储选项”,我想这就是我需要使用的。但如何呢?

6.没有 Hadoop 的示例

# Writing to parquet
df.to_parquet(path="testlocal.parquet")

# Read lazily 
lazy_df = pl.scan_parquet(source="testlocal.parquet")

# Transforms and write
lazy_df.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
stream parquet python-polars pyarrow sink
1个回答
0
投票

首先是坏消息,没有办法直接沉入文件系统。如果您想使用

sink_parquet
,您必须先沉入本地存储,然后进行复制。看起来这种情况很快就会随着此 PR 而改变,但这不在当前版本中。

对于扫描(即只读),您有两种选择。一是使用

scan_pyarrow_dataset

可能看起来像这样

import pyarrow.dataset as ds
pq_lf = pl.scan_pyarrow_dataset(
          ds.dataset(hdfs_out_path_1, 
                        filesystem= hdfs_filesystem
          )
        )

现在你有了一个懒惰的框架。这可能是最好的方法,因为您已经在使用 pyarrow.fs,它似乎独立于 fsspec,这就是 Polars 访问云文件的方式。

使用

scan_parquet
的 storage_options 的另一个选项在底层使用
fsspec.open
。这种用法可能看起来像这样

pq_lf=pl.scan_parquet(f"hdfs://{hdfs_out_path_1}, storage_options=some_dict)

相对于您当前在

storage_options
中所做的事情,您的
hdfs_filesystem = HDFSConnection('default')
的外观将由您的 hdfs 提供商决定。

准下沉解决方法

你可以做这样的事情

schema=lazy_df.fetch(1).to_arrow().schema
with pq.ParquetWriter(hdfs_out_path_1, 
                             schema=schema, 
                             filesystem=hdfs_filesystem) as pq_writer:
    for A_chunk in [(9000, 12000), (12000, 15000), (15000,20000)]:
        pq_writer.write_table(
            lazy_df
                .filter(pl.col('A').is_between(A_chunk))
                 # I'd also put a sort here
                .collect()
                .to_arrow()
                )

旁注:

当 Polars 写入 Parquet 文件时,即使是通过 pyarrow,它也会关闭统计信息,这将排除未来对行组的读取优化,因此如果您需要统计信息,您需要显式指定

statistics=True
。 pyarrow 默认使用 snappy 压缩,但 Polars 使用 zstd。我认为,在这一点上,pyarrow 的默认值只是长期以来默认的东西之一,他们无法更改它。我这样说是因为我尝试使用的所有东西似乎都支持 zstd,并且生成的文件比 snappy 更小,而不会真正牺牲性能,所以如果您使用 pyarrow writer,那么我会设置
compression='zstd'
。当您像上面一样使用 ParquetWriter 时,每次调用 write_table 都会生成一个不同的 row_group,因此您可以通过此处的分块策略来真正优化未来的读取,该策略与您稍后读取文件的方式一致(只要您打开统计信息) ).

© www.soinside.com 2019 - 2024. All rights reserved.