我正在使用 pandas 使用带有分区的 to_parquet 函数编写 parquet 文件。示例:
df.to_parquet('gs://bucket/path', partition_cols=['key'])
问题是每次我运行代码时。它会在分区中添加一个新的 parquet 文件,当您读取数据时,您会在每次运行脚本时获取所有数据。本质上,数据每次都会附加。
有没有办法在每次使用 pandas 写入时都覆盖数据?
我发现dask对于阅读和书写镶木地板很有帮助。它默认写入时的文件名(您可以更改),如果您使用相同的名称,它将替换镶木地板文件,我相信这就是您正在寻找的文件。您可以通过将“append”设置为 True 将数据附加到分区,这对我来说更直观,或者您可以将“overwrite”设置为 True,这将在写入文件之前删除分区/文件夹中的所有文件。通过在读取时将分区列包含在数据帧中,读取 parquet 效果也很好。
https://docs.dask.org/en/stable/ generated/dask.dataframe.to_parquet.html
请参阅下面的一些代码,我用来满足自己对 dask.dataframe.to_parquet 行为的要求:
import pandas as pd
from dask import dataframe as dd
import numpy as np
dates = pd.date_range("2015-01-01", "2022-06-30")
df_len = len(dates)
df_1 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"])
df_2 = pd.DataFrame(np.random.randint(0, 1000, size=(df_len, 1)), columns=["value"])
df_1["date"] = dates
df_1["YEAR"] = df_1["date"].dt.year
df_1["MONTH"] = df_1["date"].dt.month
df_2["date"] = dates
df_2["YEAR"] = df_2["date"].dt.year
df_2["MONTH"] = df_2["date"].dt.month
ddf_1 = dd.from_pandas(df_1, npartitions=1)
ddf_2 = dd.from_pandas(df_2, npartitions=1)
name_function = lambda x: f"monthly_data_{x}.parquet"
ddf_1.to_parquet(
"dask_test_folder",
name_function=name_function,
partition_on=["YEAR", "MONTH"],
write_index=False,
)
print(ddf_1.head())
ddf_first_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1")
print(ddf_first_write.head())
ddf_2.to_parquet(
"dask_test_folder",
name_function=name_function,
partition_on=["YEAR", "MONTH"],
write_index=False,
)
print(ddf_2.head())
ddf_second_write = dd.read_parquet("dask_test_folder/YEAR=2015/MONTH=1")
print(ddf_second_write.head())
是的,有。您需要阅读 pandas 文档,您会看到 to_parquet 支持 **kwargs 并默认使用 engine:pyarrow 。这样你就可以看到 pyarrow 文档了。在那里你会看到有两种方法可以做到这一点。一,通过使用partition_filename_cb,它需要遗留支持并且将被弃用。 二、使用basename_template,这是新的方式。这是因为运行可调用/lambda 来命名每个分区的性能问题。您需要传递一个字符串:
"string_{i}"
。仅适用于关闭旧版支持的情况。保存的文件将为“string_0”,“string_1”...
您不能同时使用两者。
def write_data(
df: pd.DataFrame,
path: str,
file_format="csv",
comp_zip="snappy",
index=False,
partition_cols: list[str] = None,
basename_template: str = None,
storage_options: dict = None,
**kwargs,
) -> None:
getattr(pd.DataFrame, f"to_{file_format}")(
df,
f"{path}.{file_format}",
compression=comp_zip,
index=index,
partition_cols=partition_cols,
basename_template=basename_template,
storage_options={"token": creds},
**kwargs,
)
试试这个。
将
existing_data_behavior
与 delete_matching
一起使用。例如:
df.to_parquet(
'gs://bucket/path',
partition_cols=['key'],
existing_data_behavior='delete_matching'
)
来自 pyarrow 文档:
“delete_matching”在编写分区数据集时很有用。第一次遇到每个分区目录时,整个目录将被删除。这允许您完全覆盖旧分区。