有没有办法使用带分区的 pandas to_parquet 覆盖现有数据?

问题描述 投票:0回答:3

我正在使用 pandas 使用带有分区的 to_parquet 函数编写 parquet 文件。示例:

df.to_parquet('gs://bucket/path', partition_cols=['key'])

问题是每次我运行代码时。它会在分区中添加一个新的 parquet 文件,当您读取数据时,您会在每次运行脚本时获取所有数据。本质上,数据每次都会附加。

有没有办法在每次使用 pandas 写入时都覆盖数据?

python pandas parquet
3个回答
0
投票

我发现对于阅读和书写镶木地板很有帮助。它默认写入时的文件名(您可以更改),如果您使用相同的名称,它将替换镶木地板文件,我相信这就是您正在寻找的文件。您可以通过将“append”设置为 True 将数据附加到分区,这对我来说更直观,或者您可以将“overwrite”设置为 True,这将在写入文件之前删除分区/文件夹中的所有文件。通过在读取时将分区列包含在数据帧中,读取 parquet 效果也很好。

https://docs.dask.org/en/stable/ generated/dask.dataframe.to_parquet.html

请参阅下面的一些代码,我用来满足自己对 dask.dataframe.to_parquet 行为的要求:

import pandas as pd
from dask import dataframe as dd
import numpy as np

dates = pd.date_range("2015-01-01", "2022-06-30")
df_len = len(dates)
df_1 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"])
df_2 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"])

df_1["date"] = dates
df_1["YEAR"] = df_1["date"].dt.year
df_1["MONTH"] = df_1["date"].dt.month

df_2["date"] = dates
df_2["YEAR"] = df_2["date"].dt.year
df_2["MONTH"] = df_2["date"].dt.month

ddf_1 = dd.from_pandas(df_1, npartitions=1)
ddf_2 = dd.from_pandas(df_2, npartitions=1)

name_function = lambda x: f"monthly_data_{x}.parquet"

ddf_1.to_parquet(
    "dask_test_folder",
    name_function=name_function,
    partition_on=["YEAR", "MONTH"],
    write_index=False,
)

print(ddf_1.head())
ddf_first_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1")
print(ddf_first_write.head())

ddf_2.to_parquet(
    "dask_test_folder",
    name_function=name_function,
    partition_on=["YEAR", "MONTH"],
    write_index=False,
)

print(ddf_2.head())
ddf_second_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1")
print(ddf_second_write.head())


0
投票

是的,有。您需要阅读 pandas 文档,您会看到 to_parquet 支持 **kwargs 并默认使用 engine:pyarrow 。这样你就可以看到 pyarrow 文档了。在那里你会看到有两种方法可以做到这一点。一,通过使用partition_filename_cb,它需要遗留支持并且将被弃用。 二、使用basename_template,这是新的方式。这是因为运行可调用/lambda 来命名每个分区的性能问题。您需要传递一个字符串:

"string_{i}"
。仅适用于关闭旧版支持的情况。保存的文件将为“string_0”,“string_1”... 您不能同时使用两者。

def write_data(
    df: pd.DataFrame,
    path: str,
    file_format="csv",
    comp_zip="snappy",
    index=False,
    partition_cols: list[str] = None,
    basename_template: str = None,
    storage_options: dict = None,
    **kwargs,
) -> None:
    getattr(pd.DataFrame, f"to_{file_format}")(
        df,
        f"{path}.{file_format}",
        compression=comp_zip,
        index=index,
        partition_cols=partition_cols,
        basename_template=basename_template,
        storage_options={"token": creds},
        **kwargs,
    )

试试这个。


0
投票

existing_data_behavior
delete_matching
一起使用。例如:

df.to_parquet(
    'gs://bucket/path', 
    partition_cols=['key'], 
    existing_data_behavior='delete_matching'
)

来自 pyarrow 文档

“delete_matching”在编写分区数据集时很有用。第一次遇到每个分区目录时,整个目录将被删除。这允许您完全覆盖旧分区。

© www.soinside.com 2019 - 2024. All rights reserved.