pandas 通过附加将数据帧写入 parquet 格式

问题描述 投票:0回答:7

我正在尝试以

pandas dataframe
模式将
parquet
写入
append
文件格式(在最新的 pandas 版本 0.21.0 中引入)。但是,该文件不会附加到现有文件,而是会被新数据覆盖。我错过了什么?

写入语法是

df.to_parquet(path, mode='append')

读取语法是

pd.read_parquet(path)
python apache pandas parquet
7个回答
21
投票

看起来可以使用 fastparquet 将行组附加到现有的镶木地板文件中。这是一个非常独特的功能,因为大多数库没有这个实现。

以下来自pandas doc

DataFrame.to_parquet(path, engine='auto', compression='snappy', index=None, partition_cols=None, **kwargs)

我们必须传入引擎和 **kwargs。

  • engine{‘auto’, ‘pyarrow’, ‘fastparquet’}
  • **kwargs - 传递给 parquet 库的附加参数。

**kwargs - 这里我们需要传递的是: append=True (来自 fastparquet

import pandas as pd
from pathlib import Path

df = pd.DataFrame({'col1': [1, 2,], 'col2': [3, 4]})
file_path = Path("D:\\dev\\output.parquet")

if file_path.exists():
  df.to_parquet(file_path, engine='fastparquet', append=True)
else:
  df.to_parquet(file_path, engine='fastparquet')

如果追加设置为 True 并且文件不存在,那么您将看到以下错误

AttributeError: 'ParquetFile' object has no attribute 'fmd'

运行上面的脚本 3 次,我在 parquet 文件中有以下数据。

如果我检查元数据,我可以看到这导致了 3 行组。


注:

如果写入太多小行组,追加可能会效率低下。通常建议的行组大小接近 100,000 或 1,000,000 行。与非常小的行组相比,这有一些好处。压缩效果会更好,因为压缩仅在行组内运行。由于每个行组都存储自己的统计信息,因此存储统计信息所花费的开销也会更少。


7
投票

要追加,请执行以下操作:

import pandas as pd 
import pyarrow.parquet as pq
import pyarrow as pa

dataframe = pd.read_csv('content.csv')
output = "/Users/myTable.parquet"

# Create a parquet table from your dataframe
table = pa.Table.from_pandas(dataframe)

# Write direct to your parquet file
pq.write_to_dataset(table , root_path=output)

这将自动附加到您的表格中。


3
投票

我使用了 awswrangler 库。它就像一个魅力

以下是参考文档

https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html

我已从 kinesis 流中读取并使用 kinesis-python 库来使用消息并写入 s3 。我没有包含 json 的处理逻辑,因为这篇文章处理的是无法将数据附加到 s3 的问题。在 aws sagemaker jupyter 中执行

以下是我使用的示例代码:

!pip install awswrangler
import awswrangler as wr
import pandas as pd
evet_data=pd.DataFrame({'a': [a], 'b':[b],'c':[c],'d':[d],'e': [e],'f':[f],'g': [g]},columns=['a','b','c','d','e','f','g'])
#print(evet_data)
s3_path="s3://<your bucker>/table/temp/<your folder name>/e="+e+"/f="+str(f)
try:
    wr.s3.to_parquet(
    df=evet_data,
    path=s3_path,
    dataset=True,
    partition_cols=['e','f'],
    mode="append",
    database="wat_q4_stg",
    table="raw_data_v3",
    catalog_versioning=True  # Optional
    )
    print("write successful")       
except Exception as e:
    print(str(e))

任何有帮助的澄清。在另外几篇文章中,我读过读取数据并再次覆盖。但随着数据变大,这个过程会变慢。效率很低


1
投票

pandas.to_parquet()
中没有追加模式。您可以做的是读取现有文件,更改它,然后写回它并覆盖它。


1
投票

使用fastparquet写入功能

from fastparquet import write

write(file_name, df, append=True)

据我了解,该文件必须已经存在。

API 可在此处使用(至少目前):https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write


0
投票

如果您正在考虑使用分区:

根据 Pyarrow doc(这是使用分区时在幕后调用的函数),您可能希望将

partition_cols
与唯一的
basename_template
名称结合起来。即类似以下内容:

df.to_parquet(root_path, partition_cols=["..."], basename_template="{i}")

如果

basename_template
不与现有数据重叠,您可以省略
df
。但如果确实有重叠,则会创建重复的
.parquet
文件。

如果您的分区列包含时间戳,这非常方便。这样,您实际上可以拥有一个“滚动”DataFrame,并且不会重复写入,只会创建与新时间相对应的新文件。


-1
投票

Pandas

to_parquet()
可以处理单个文件以及包含多个文件的目录。如果文件已经存在,Pandas 会默默地覆盖该文件。要附加到镶木地板对象,只需将新文件添加到同一镶木地板目录即可。

os.makedirs(path, exist_ok=True)

# write append (replace the naming logic with what works for you)
filename = f'{datetime.datetime.utcnow().timestamp()}.parquet'
df.to_parquet(os.path.join(path, filename))

# read
pd.read_parquet(path)
© www.soinside.com 2019 - 2024. All rights reserved.