在 LazyFrame 上使用 sink_parquet 时出现以下错误。早些时候,我在
.collect()
的输出上使用 scan_parquet()
将结果转换为 DataFrame,但不幸的是它不适用于大于 RAM 的数据集。这是我收到的错误 -
PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
在 LazyFrame 上添加一些过滤器和连接条件后,我尝试将 LazyFrame(scan_parquet 的输出)写入本地文件。 看来错误来自以下位置 -
https://github.com/pola-rs/polars/blob/master/py-polars/polars/internals/lazyframe/frame.py#L1235(Python)
我已尝试更新到最新版本0.15.160.16.1,但此问题仍然存在。
示例代码:
pl.scan_parquet("path/to/file1.parquet")
.select([
pl.col("col2"),
pl.col("col2").apply( lambda x : ...)
.alias("splited_levels"),
..followed by more columns and .alias()
])
.join(<another lazyframe>,on="some key",how="inner")
.filter(...)
.filter(..)
..followed by some more filters
.sink_parquet("path/to/result2.parquet")
镶木地板文件应写入本地系统。相反,我收到以下错误 -
PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
以下是我使用后安装的软件包的详细信息
polars.show_versions()
-
--- Version info----
Polars : 0.15.16
Index type : UInt32
Platform : Linux-4.15.0-191-generic-x86_64-with-glibc2.28
Python: 3.9.16
[GCC 8.3.0]
--- Optional dependencies---
pyarrow : 11.0.0
pandas : not installed
numpy : 1.24.1
fsspec : 2023.1.0
connectorx : not installed
xlsx2csv : not installed
deltalake: not installed
matplotlib : not installed
更新:我在这里提出了一个 github 问题,目前似乎不支持所有类型的流式传输。因此,我正在寻找解决这种情况的方法,或者使用极地进行此操作的任何替代方法 https://github.com/pola-rs/polars/issues/6603
我不知道幕后发生了什么,但我发现对我有用的一件事是设置
.collect(streaming=True)
,甚至设置pl.Config.set_streaming_chunk_size()
,如果它仍然会破坏内存。
我们可以看到目前
pl.cocat_list()
目前不支持流媒体,这意味着我们将无法.sink_parquet()
:
(pl.LazyFrame({'a':'word', 'b': 'word2'})
.with_columns(joined = pl.concat_list(pl.col('a'),
pl.col('b'))
)
.explain(streaming=True))
WITH_COLUMNS:
[col("a").list.concat([col("b")]).alias("joined")]
--- STREAMING
DF ["a", "b"]; PROJECT */2 COLUMNS; SELECTION: "None" --- END STREAMING
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
因此,我将使用以下内容:
pl.Config.set_streaming_chunk_size(1000)
(pl.LazyFrame({'a':'word', 'b': 'word2'})
.with_columns(joined = pl.concat_list(pl.col('a'),
pl.col('b'))
)
.collect(streaming=True)
.write_parquet('test.parquet')
)
在实践中,由于我对幕后发生的事情一无所知,这似乎会尽可能多地进行流式传输,并且一旦到达需要它的操作,就会诉诸于内存中的收集。
对于特别大的数据,例如列表或字符串几乎太大而无法放入内存,设置较小的块大小有时允许我写入文件。