我有一个 pandas 数据框。我想将此数据帧写入 S3 中的 parquet 文件。 我需要相同的示例代码。我尝试用谷歌搜索它。但我无法获得有效的示例代码。
供您参考,我有以下代码作品。
s3_url = 's3://bucket/folder/bucket.parquet.gzip'
df.to_parquet(s3_url, compression='gzip')
为了使用
to_parquet
,您需要安装pyarrow
或fastparquet
。另外,请确保位于 config
文件夹中的 credentials
和 .aws
文件中包含正确的信息。
编辑:此外,需要
s3fs
。请参阅https://stackoverflow.com/a/54006942/1862909
下面的函数在缓冲区中获取镶木地板输出,然后将 buffer.values() 写入 S3,无需在本地保存镶木地板
此外,由于您正在创建 s3 客户端,因此您可以使用 aws s3 密钥创建凭证,这些凭证可以存储在本地、气流连接或 aws Secrets Manager 中
def dataframe_to_s3(s3_client, input_datafame, bucket_name, filepath, format):
if format == 'parquet':
out_buffer = BytesIO()
input_datafame.to_parquet(out_buffer, index=False)
elif format == 'csv':
out_buffer = StringIO()
input_datafame.to_parquet(out_buffer, index=False)
s3_client.put_object(Bucket=bucket_name, Key=filepath, Body=out_buffer.getvalue())
S3_client 只是一个 boto3 客户端对象。希望这有帮助!
首先确保你已经安装了 pandas 和 pyarrow 或 fastparquet。
然后安装 boto3 和 aws cli。使用 aws cli 设置配置和凭证文件,位于 .aws 文件夹。
这是一个简单的脚本,使用 pyarrow 和 boto3 创建临时镶木地板文件,然后发送到 AWS S3。
不包括导入的示例代码:
import pyarrow as pa
import pyarrow.parquet as pq
def main():
data = {0: {"data1": "value1"}}
df = pd.DataFrame.from_dict(data, orient='index')
write_pandas_parquet_to_s3(
df, "bucket", "folder/test/file.parquet", ".tmp/file.parquet")
def write_pandas_parquet_to_s3(df, bucketName, keyName, fileName):
# dummy dataframe
table = pa.Table.from_pandas(df)
pq.write_table(table, fileName)
# upload to s3
s3 = boto3.client("s3")
BucketName = bucketName
with open(fileName) as f:
object_data = f.read()
s3.put_object(Body=object_data, Bucket=BucketName, Key=keyName)
对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,有助于 Pandas/S3/Parquet 之间的集成
安装;
pip install awswrangler
如果您想将 pandas 数据框作为 parquet 文件写入 S3;
import awswrangler as wr
wr.s3.to_parquet(
dataframe=df,
path="s3://my-bucket/key/my-file.parquet"
)
只是提供一个使用 kwargs 强制覆盖的进一步示例。
我的用例是分区结构确保如果我重新处理输入文件,输出镶木地板应覆盖分区中的所有内容。为此,我使用传递给 pyarrow 的 kwargs:
s3_url = "s3://<your-bucketname>/<your-folderpath>/"
df.to_parquet(s3_url,
compression='snappy',
engine = 'pyarrow',
partition_cols = ["GSDate","LogSource", "SourceDate"],
existing_data_behavior = 'delete_matching')
最后一个参数(existing_data_behaviour)是传递给底层 pyarrow write_dataset 的 **kwargs 的一部分。 (https://arrow.apache.org/docs/python/ generated/pyarrow.dataset.write_dataset.html#pyarrow.dataset.write_dataset)
否则,重新运行会创建重复的数据。如上所述,这需要
s3fs