我有 AWS Glue 工作。它使用“Glue 4.0 - 支持spark 3.3、Scala 2、Python 3”版本。
它读取各个镶木地板文件并写入 Delta Lake。我之前用过
"write.parquet.compression-codec": "snappy"
。
现在我希望改用 Zstandard
"write.parquet.compression-codec": "zstd"
来代替。
这是我的新代码:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
"s3://my-bucket/data/raw-parquet/motor/"
],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)
additional_options = {
"path": "s3://my-bucket/data/new-zstd-delta-tables/motor/", # <- An empty new folder
"write.parquet.compression-codec": "zstd", # <- Changed to zstd
"mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
"overwrite"
).save()
job.commit()
但是,在 Delta Lake 中生成的最终 parquet 文件仍然看起来像
part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet
。请注意名称中的snappy
。
我还验证了文件内容实际上是使用“snappy”压缩编解码器
import pyarrow.parquet as pq
parquet_file_path = "part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet"
print(pq.ParquetFile(parquet_file_path).metadata.row_group(0).column(0).compression)
# It prints "SNAPPY"
如何在 AWS Glue 中正确使用“zstd”压缩编解码器写入 Delta Lake?谢谢!
您需要使用
compression
选项(doc)。例如:
spark.range(100).write.format("delta")\
.option("compression", "zstd").mode("overwrite").save("zstd.delta")
将为您提供所需的输出(在 Databricks 和本地 Spark 上测试):
# ls zstd.delta
_delta_log/
part-00000-b7078370-6b8f-4632-8071-8ee2dbc61194-c000.zstd.parquet
part-00001-3201e316-1cc7-4f78-9945-f7b4b21922b0-c000.zstd.parquet
part-00002-d25fb0ed-bf7d-468f-bd98-df480c2acabd-c000.zstd.parquet
part-00003-b9e60fb8-2d7a-4e4f-981c-a4c9314c9b41-c000.zstd.parquet
part-00004-224f9282-930b-4a89-ba08-2c4b752781dd-c000.zstd.parquet
part-00005-4671308a-3e6a-4cba-83ad-bb6e7f404d68-c000.zstd.parquet
part-00006-2b08bbf9-7ece-4ccd-828b-88713fe226f9-c000.zstd.parquet
part-00007-53fc0ebb-29b1-496c-bcce-f2994ec04226-c000.zstd.parquet
part-00008-f0cf609d-d1c4-4805-8586-dd6e9481cc9b-c000.zstd.parquet
part-00009-a918c5c8-77ea-4f6d-b559-d36740e5a3bb-c000.zstd.parquet
part-00010-4a47e50d-dfdc-4f7f-826a-7e273a2fd404-c000.zstd.parquet
part-00011-95625e5a-7130-4f45-bf6c-bc721fe7561b-c000.zstd.parquet
或者,您可以全局设置:
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")