我正在尝试以
pandas dataframe
模式将 parquet
写入 append
文件格式(在最新的 pandas 版本 0.21.0 中引入)。但是,该文件不会附加到现有文件,而是会被新数据覆盖。我错过了什么?
写入语法是
df.to_parquet(path, mode='append')
读取语法是
pd.read_parquet(path)
看起来可以使用 fastparquet 将行组附加到现有的镶木地板文件中。这是一个非常独特的功能,因为大多数库没有这个实现。
以下来自pandas doc:
DataFrame.to_parquet(path, engine='auto', compression='snappy', index=None, partition_cols=None, **kwargs)
我们必须传入引擎和 **kwargs。
- engine{‘auto’, ‘pyarrow’, ‘fastparquet’}
- **kwargs - 传递给 parquet 库的附加参数。
**kwargs - 这里我们需要传递的是: append=True (来自 fastparquet)
import pandas as pd
from pathlib import Path
df = pd.DataFrame({'col1': [1, 2,], 'col2': [3, 4]})
file_path = Path("D:\\dev\\output.parquet")
if file_path.exists():
df.to_parquet(file_path, engine='fastparquet', append=True)
else:
df.to_parquet(file_path, engine='fastparquet')
如果追加设置为 True 并且文件不存在,那么您将看到以下错误
AttributeError: 'ParquetFile' object has no attribute 'fmd'
运行上面的脚本 3 次,我在 parquet 文件中有以下数据。
如果我检查元数据,我可以看到这导致了 3 行组。
注:
如果写入太多小行组,追加可能会效率低下。通常建议的行组大小接近 100,000 或 1,000,000 行。与非常小的行组相比,这有一些好处。压缩效果会更好,因为压缩仅在行组内运行。由于每个行组都存储自己的统计信息,因此存储统计信息所花费的开销也会更少。
要追加,请执行以下操作:
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
dataframe = pd.read_csv('content.csv')
output = "/Users/myTable.parquet"
# Create a parquet table from your dataframe
table = pa.Table.from_pandas(dataframe)
# Write direct to your parquet file
pq.write_to_dataset(table , root_path=output)
这将自动附加到您的表格中。
我使用了 awswrangler 库。它就像一个魅力
以下是参考文档
https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html
我已从 kinesis 流中读取并使用 kinesis-python 库来使用消息并写入 s3 。我没有包含 json 的处理逻辑,因为这篇文章处理的是无法将数据附加到 s3 的问题。在 aws sagemaker jupyter 中执行
以下是我使用的示例代码:
!pip install awswrangler
import awswrangler as wr
import pandas as pd
evet_data=pd.DataFrame({'a': [a], 'b':[b],'c':[c],'d':[d],'e': [e],'f':[f],'g': [g]},columns=['a','b','c','d','e','f','g'])
#print(evet_data)
s3_path="s3://<your bucker>/table/temp/<your folder name>/e="+e+"/f="+str(f)
try:
wr.s3.to_parquet(
df=evet_data,
path=s3_path,
dataset=True,
partition_cols=['e','f'],
mode="append",
database="wat_q4_stg",
table="raw_data_v3",
catalog_versioning=True # Optional
)
print("write successful")
except Exception as e:
print(str(e))
任何有帮助的澄清。在另外几篇文章中,我读过读取数据并再次覆盖。但随着数据变大,这个过程会变慢。效率很低
pandas.to_parquet()
中没有追加模式。您可以做的是读取现有文件,更改它,然后写回它并覆盖它。
使用fastparquet写入功能
from fastparquet import write
write(file_name, df, append=True)
据我了解,该文件必须已经存在。
API 可在此处使用(至少目前):https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
如果您正在考虑使用分区:
根据 Pyarrow doc(这是使用分区时在幕后调用的函数),您可能希望将
partition_cols
与唯一的 basename_template
名称结合起来。即类似以下内容:
df.to_parquet(root_path, partition_cols=["..."], basename_template="{i}")
如果
basename_template
不与现有数据重叠,您可以省略 df
。但如果确实有重叠,则会创建重复的 .parquet
文件。
如果您的分区列包含时间戳,这非常方便。这样,您实际上可以拥有一个“滚动”DataFrame,并且不会重复写入,只会创建与新时间相对应的新文件。
Pandas
to_parquet()
可以处理单个文件以及包含多个文件的目录。如果文件已经存在,Pandas 会默默地覆盖该文件。要附加到镶木地板对象,只需将新文件添加到同一镶木地板目录即可。
os.makedirs(path, exist_ok=True)
# write append (replace the naming logic with what works for you)
filename = f'{datetime.datetime.utcnow().timestamp()}.parquet'
df.to_parquet(os.path.join(path, filename))
# read
pd.read_parquet(path)