如何在 AWS Glue 中使用“zstd”压缩编解码器写入 Delta Lake?

问题描述 投票:0回答:1

我有 AWS Glue 工作。它使用“Glue 4.0 - 支持spark 3.3、Scala 2、Python 3”版本。

它读取各个镶木地板文件并写入 Delta Lake。我之前用过

"write.parquet.compression-codec": "snappy"

现在我希望改用 Zstandard

"write.parquet.compression-codec": "zstd"
来代替。

这是我的新代码:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://my-bucket/data/raw-parquet/motor/"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

additional_options = {
    "path": "s3://my-bucket/data/new-zstd-delta-tables/motor/", # <- An empty new folder
    "write.parquet.compression-codec": "zstd", # <- Changed to zstd
    "mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
    "overwrite"
).save()

job.commit()

但是,在 Delta Lake 中生成的最终 parquet 文件仍然看起来像

part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet
。请注意名称中的
snappy

我还验证了文件内容实际上是使用“snappy”压缩编解码器

import pyarrow.parquet as pq

parquet_file_path = "part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet"
print(pq.ParquetFile(parquet_file_path).metadata.row_group(0).column(0).compression)
# It prints "SNAPPY"

如何在 AWS Glue 中正确使用“zstd”压缩编解码器写入 Delta Lake?谢谢!

amazon-web-services apache-spark pyspark aws-glue delta-lake
1个回答
0
投票

您需要使用

compression
选项(doc)。例如:

spark.range(100).write.format("delta")\
  .option("compression", "zstd").mode("overwrite").save("zstd.delta")

将为您提供所需的输出(在 Databricks 和本地 Spark 上测试):

# ls zstd.delta
_delta_log/
part-00000-b7078370-6b8f-4632-8071-8ee2dbc61194-c000.zstd.parquet
part-00001-3201e316-1cc7-4f78-9945-f7b4b21922b0-c000.zstd.parquet
part-00002-d25fb0ed-bf7d-468f-bd98-df480c2acabd-c000.zstd.parquet
part-00003-b9e60fb8-2d7a-4e4f-981c-a4c9314c9b41-c000.zstd.parquet
part-00004-224f9282-930b-4a89-ba08-2c4b752781dd-c000.zstd.parquet
part-00005-4671308a-3e6a-4cba-83ad-bb6e7f404d68-c000.zstd.parquet
part-00006-2b08bbf9-7ece-4ccd-828b-88713fe226f9-c000.zstd.parquet
part-00007-53fc0ebb-29b1-496c-bcce-f2994ec04226-c000.zstd.parquet
part-00008-f0cf609d-d1c4-4805-8586-dd6e9481cc9b-c000.zstd.parquet
part-00009-a918c5c8-77ea-4f6d-b559-d36740e5a3bb-c000.zstd.parquet
part-00010-4a47e50d-dfdc-4f7f-826a-7e273a2fd404-c000.zstd.parquet
part-00011-95625e5a-7130-4f45-bf6c-bc721fe7561b-c000.zstd.parquet

或者,您可以全局设置:

spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
© www.soinside.com 2019 - 2024. All rights reserved.