如何将数据从Python函数流式传输到镶木地板文件?

问题描述 投票:0回答:1

我有一个 python 文件,它对文件列表执行一些操作。生成的数据帧约为 6 GB,因此我需要使用数据库或使用 parquet 对其进行压缩。问题是,数据是从函数生成的,所以我想知道如何将批量数据流式传输到镶木地板文件中。我在 SO 上发现的是获取已经创建的数据集并将其分解并发送到 Parquet。我也主要使用 Polars 而不是 Pandas,所以如果有人有任何建议那就太好了。

from time import sleep


def min_and_max(x: list[int]) -> tuple[int, int]:
    sleep(30)  # simuluating an expensive operation
    return (min(x), max(x))


numbers = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]

for num in numbers:
    print(min_and_max(num))
    # I need a write_parquet function here.
    # Saving the results in a list or another data structure 
    # is taking too much memory.
python streaming parquet python-polars
1个回答
0
投票

这是我想出的 Polars 式的解决方案。它利用了

write_parquet
函数在后端调用 Pyarrow parquet 包的事实。为了不使事情复杂化,我将使用我的虚拟示例,但只要知道你可以

from pathlib import Path
from time import sleep


# This part about the file path is not that relevant - just including for completeness
filepath = Path.cwd()
print(f"Dataframe will be saved here: {filepath}")


def min_and_max(x: list[int]) -> list[list[pl.Series]]:
    sleep(30)  # simuluating an expensive operation
    return [
        [pl.Series("min", min(x), dtype=pl.UInt8)], 
        [pl.Series("max", max(x), dtype=pl.UInt8)]
    ] # need to put all the return values in a nested list of lists


numbers = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]

for i, v in enumerate(numbers):
    pl.from_records(
        min_and_max(numbers[i]), schema=["min", "max"]
    ).write_parquet(
        file=f"{filepath / "num_file"_i}.parquet", 
        compression="zstd", 
        compression_level=20, # experiment with this - see docs
        use_pyarrow=True
    )

有一个 pyarrow_options 允许您添加其他选项。这将逐步创建 parquet 文件并将它们发送到您指定的 parquet 文件。

© www.soinside.com 2019 - 2024. All rights reserved.