优化Oracle以Parquet Spark Write

问题描述 投票:0回答:1

对 Spark 非常陌生,所以尝试通过我继承的这个简单任务来学习。

任务是从 Oracle 数据库中提取数据(使用分区),然后将其作为 Parquet 文件写入 S3 兼容存储。

简而言之,代码看起来像这样:

val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select * from table1 where c1>0)")
.option("partitionColumn", "partition_col")
.option("lowerBound", "1")
.option("upperBound", "9999999")
.option("numPartitions", numPartitions)
.load()

df.write.mode("overwrite").format("parquet").save("s3a://location")

Spark 配置为:

    "spark.speculation": "false"
    "spark.kryoserializer.buffer": "256m"
    "spark.kryoserializer.buffer.max": "2047m"
    "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a": "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
    "spark.hadoop.fs.s3a.committer.name": "directory"
    "spark.hadoop.fs.s3a.committer.staging.conflict-mode": "append"
    "spark.hadoop.fs.s3a.multipart.size": "256M"
    "spark.hadoop.fs.s3a.multipart.threshold": "256M"
    "spark.hadoop.fs.s3a.block.size": "256M"
    "spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
    "spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
    "spark.sql.files.ignoreCorruptFiles": "true"
    "spark.sql.legacy.parquet.datetimeRebaseModeInRead": "LEGACY"
    "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY"
    "spark.sql.legacy.timeParserPolicy": "LEGACY"
    "spark.sql.shuffle.partitions": "200"
    "spark.default.parallelism": "200"
    "spark.hadoop.fs.s3a.fast.upload": "true"

spark 作业大约需要一个半小时才能运行,我正在尝试找出是否有办法减少该时间。 生成的表约为 800GB,并且我已确保所选的partition_col确实是均匀分布的。现在我并不真正关心生成的镶木地板的文件大小/数量,我只想尽快完成镶木地板的编写。

目前集群固定有: 20 个执行器实例 每个执行器 2 个核心 每个执行器 80GB

一些观察 - 我注意到有一些长时间运行的任务,尽管它们似乎正在写入与其他任务相同的相对数量的数据。有人知道这里发生了什么事吗?看: https://i.imgur.com/HXOZCbE.png

如果有人有任何优化建议,我们将不胜感激!

到目前为止,我已经尝试使用不同的分区列和 numPartitions 从 80 到 1000,但无论如何它似乎都停留在 90 分钟左右。 我也尝试将 fs.s3a.fast.upload 设置为 true 但无济于事

oracle scala apache-spark etl
1个回答
0
投票

请不要为了提高吞吐量而在 Oracle 数据库中投入高度并发性。几乎可以肯定,数据库的容量超载了,特别是在存储阵列层。这将淹没 Exadata 的存储单元,并通过屋顶发送普通存储阵列的响应时间,影响使用它的每个人(通常是许多数据库)。

如果您按不是 Oracle 分区的分区键的列对作业进行分区,则每个线程必须扫描 100% 的表。如果您有 1000 个线程,则需要扫描整个表 1000 次,这是所需工作量的 1000 倍。那没有规模。我们在环境中一遍又一遍地看到这种情况,我们不可避免地必须将其关闭,因为它会导致其他人的性能下降。对作业进行分区的唯一负责任的方法是将您的分区与 Oracle 自己的分区保持一致。那么每个线程只读取属于它的段,因此不存在重叠的读取工作。 你的工作没有随着更高的并行性而改善的原因是因为它无法扩展。数据库根本无法读取您要求的数据量,而使用 1000 个线程读取的数据量要比使用 100 个线程读取的速度快。这已经超出了其限制。当人们要求超过 10 个并发线程时,我们通常会看到问题出现。 1000 已经超出了范围。

对这些作业进行分区的通常原因是为了克服通过网络获取结果的序列化问题。这是可以理解的。但你无法通过复合数据库的负载来解决这个问题。如果您无法将提取内容与原始表的实际 Oracle 分区保持一致,那么您可以考虑将加载分区临时表作为第一步,然后使用从各自分区并发提取的单个线程进入。如果 Spark 无法做到这一点,请考虑其他可以做到这一点的工具。

© www.soinside.com 2019 - 2024. All rights reserved.