在 LazyFrame 上使用 py-polars sink_parquet 方法时出现问题

问题描述 投票:0回答:1

在 LazyFrame 上使用 sink_parquet 时出现以下错误。早些时候,我在

.collect()
的输出上使用
scan_parquet()
将结果转换为 DataFrame,但不幸的是它不适用于大于 RAM 的数据集。这是我收到的错误 -

PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

在 LazyFrame 上添加一些过滤器和连接条件后,我尝试将 LazyFrame(scan_parquet 的输出)写入本地文件。 看来错误来自以下位置 -

https://github.com/pola-rs/polars/blob/master/py-polars/polars/internals/lazyframe/frame.py#L1235(Python)

https://github.com/pola-rs/polars/blob/master/polars/polars-lazy/src/physical_plan/planner/lp.rs#L154(Rust).

我已尝试更新到最新版本0.15.160.16.1,但此问题仍然存在。

示例代码:

pl.scan_parquet("path/to/file1.parquet")
.select([
    pl.col("col2"),
    pl.col("col2").apply( lambda x : ...)
    .alias("splited_levels"),
    ..followed by more columns and .alias()
])
.join(<another lazyframe>,on="some key",how="inner")
.filter(...)
.filter(..)
..followed by some more filters
.sink_parquet("path/to/result2.parquet")

镶木地板文件应写入本地系统。相反,我收到以下错误 -

PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

以下是我使用后安装的软件包的详细信息

polars.show_versions()
-

--- Version info----
Polars : 0.15.16
Index type : UInt32
Platform : Linux-4.15.0-191-generic-x86_64-with-glibc2.28
Python: 3.9.16
[GCC 8.3.0]
--- Optional dependencies---
pyarrow : 11.0.0
pandas : not installed
numpy : 1.24.1
fsspec : 2023.1.0
connectorx : not installed
xlsx2csv : not installed
deltalake: not installed
matplotlib : not installed

更新:我在这里提出了一个 github 问题,目前似乎不支持所有类型的流式传输。因此,我正在寻找解决这种情况的方法,或者使用极地进行此操作的任何替代方法 https://github.com/pola-rs/polars/issues/6603

parquet python-polars
1个回答
0
投票

我不知道幕后发生了什么,但我发现对我有用的一件事是设置

.collect(streaming=True)
,甚至设置
pl.Config.set_streaming_chunk_size()
,如果它仍然会破坏内存。

我们可以看到目前

pl.cocat_list()
目前不支持流媒体,这意味着我们将无法
.sink_parquet()
:

(pl.LazyFrame({'a':'word', 'b': 'word2'})
 .with_columns(joined = pl.concat_list(pl.col('a'), 
                                       pl.col('b'))
              )
 .explain(streaming=True))
 WITH_COLUMNS:
 [col("a").list.concat([col("b")]).alias("joined")]
  --- STREAMING
DF ["a", "b"]; PROJECT */2 COLUMNS; SELECTION: "None"  --- END STREAMING

    DF []; PROJECT */0 COLUMNS; SELECTION: "None"

因此,我将使用以下内容:

pl.Config.set_streaming_chunk_size(1000)

(pl.LazyFrame({'a':'word', 'b': 'word2'})
 .with_columns(joined = pl.concat_list(pl.col('a'), 
                                       pl.col('b'))
              )
 .collect(streaming=True)
 .write_parquet('test.parquet')
)

在实践中,由于我对幕后发生的事情一无所知,这似乎会尽可能多地进行流式传输,并且一旦到达需要它的操作,就会诉诸于内存中的收集。

对于特别大的数据,例如列表或字符串几乎太大而无法放入内存,设置较小的块大小有时允许我写入文件。

© www.soinside.com 2019 - 2024. All rights reserved.