“zstd”压缩编解码器有 22 个压缩级别。我读了这个 Uber 博客。关于压缩时间和文件大小,我使用
df.to_parquet
与我们的数据进行了验证,并得到了相同的实验结果。因此,我希望在我们的 AWS Glue Spark 作业中将压缩级别设置为 19,该作业还将数据写入 Delta Lake。
我的 AWS Glue 作业正在使用“Glue 4.0 - 支持 Spark 3.3、Scala 2、Python 3”版本。
这是我的代码
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
"s3://my-bucket/data/raw-parquet/motor/"
],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)
additional_options = {
"path": "s3://my-bucket/data/delta-tables/motor/",
"mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
"overwrite"
).save()
job.commit()
基于https://stackoverflow.com/a/74276832/2000548,我可能会使用
--conf parquet.compression.codec.zstd.level=19
。 (请注意,写答案的作者说它似乎不起作用。另一方面,博客中的 Uber 使它起作用,所以我认为可能有一种方法可以在 Spark 中正确设置“zstd”压缩级别)
这是我的
--conf
:
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
我通过“作业详细信息 -> 高级属性 -> 作业参数”在我的 Glue 作业中添加了这些配置:
--conf
spark.sql.parquet.compression.codec=zstd --conf parquet.compression.codec.zstd.level=19
(这是在 AWS Glue 作业中设置多个 --conf
的
当前方法,我之前已经验证过它可以按预期工作)
我与压缩级别 3 进行了比较。但是,压缩级别 19 和 3 在 Delta 表中生成了完全相同的 parquet 文件大小 97 MB(97,002,126 字节)。
为了确保此数据的不同“zstd”压缩级别具有不同的大小,我尝试了Python代码:
df.to_parquet(
local_parquet_path,
engine="pyarrow",
compression="zstd",
compression_level=19
)
使用压缩级别 19 的文件大小是使用压缩级别 3 的文件大小的 92%,这意味着对于该数据,当压缩级别相差很大时,文件大小存在差异。所以我觉得Spark中的
--conf parquet.compression.codec.zstd.level=19
并没有达到预期的效果。
如何在 AWS Glue 作业中设置“zstd”压缩级别?谢谢!
尝试过这些组合:
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
--conf spark.sql.parquet.compression.codec=zstd
--conf parquet.compression.codec.zstd.level=19
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
与未设置任何压缩级别或设置为 3 的情况相比,在 Delta Lake 中仍然获得完全相同的“zstd”镶木地板文件大小。
如果只用
--conf spark.io.compression.codec=zstd
--conf spark.io.compression.zstd.level=19
这实际上使最终文件成为 Snappy 格式,如
part-00000-1c8c7408-b14f-4ba1-9030-ecc437a2f8d3-c000.snappy.parquet
。这意味着 spark.io.compression.codec=zstd
没有按预期工作。
spark.io.compression.codec
不用于写入生成的镶木地板文件 - 它用于写入压缩的内部数据,例如 RDD 分区、事件日志、广播变量和随机输出(检查文档)。
从链接的 Jira 中的讨论来看,这似乎在 Spark 中不可配置(但也许我需要检查它的源代码)。