我在 Spark DF 上使用全局排序,当我启用 AQE 和后洗牌合并时,排序操作后的分区分布比以前更差。
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
"spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
"spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
我的查询在高层次上看起来:
.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
我的第一个假设是,即使范围很小,峰值也会太大。 但我检查并确认按范围重新分区可以使我在记录中得到良好的分布,但大小不好。我有近 200 个分区,它们的记录量几乎相同,但大小差异高达 9 倍,从 ~100Mb 到 ~900mb。 但通过 AEQ 并重新分区为 18000 个小范围,最小分区为 18mib,最大分区为 1.8Gib。 这种情况比没有 AEQ 的情况要糟糕得多。 需要强调的是,我使用 Spark UI -> Stage 选项卡的详细信息中的指标来识别分区大小(以字节为单位),并且我有自己的记录日志。
所以我开始调试这个问题,但是AQE没有足够的输入和输出日志
ShufflePartitionsUtil.coalescePartitions
。
这就是为什么我将查询重写为 repartitionByRange.sortWithingPartitoins。以及通过额外的日志记录来优化物理计划。
我的日志告诉我,我最初的想法是正确的。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435
还有
Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition maxsize :312832323
Output partition min size :103832323
最小大小如此不同,因为最后一个分区的大小是预期的。 TRACE 日志级别显示 99% 的分区接近 290mib。
但是为什么spark UI显示出如此不同的结果呢? ->
spark UI 可能是错误的吗? ->
也许吧,但是除了任务大小之外,任务的持续时间也太大了,这让我觉得spark UI还可以。
所以我的假设是问题出在我所在阶段的
MapOutputStatistics
。但它总是坏掉还是只在我的情况下坏掉? ->
只有我的情况吗? -> 我做了一些检查来确认。
所有这些检查让我认为
MapOutputStatistics
仅对我的情况是错误的。可能是如何与 Kafka 源关联的问题,或者我的 Kafka 输入数据分布非常不均匀。
问题:
附注 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 某些分区可能比其他分区大 2 倍。从具有 720 个分区和
minPartitions
选项 * 3 的 Kafka 主题读取。
https://www.mail-archive.com/[电子邮件受保护]/msg26851.html
这就是答案。
在缓存数据中启用 AQE 最坏的情况是不丢失 使用/重用缓存的机会,而只是额外的洗牌,如果 没有 AQE 时,outputPartitioning 恰好匹配,但之后不匹配 阿QE。这种情况发生的几率相当低。
private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
.internal()
.doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus")
.version("2.4.0")
.intConf
.checkValue(v => v > 0, "The value should be a positive integer.")
.createWithDefault(2000)
当您的分区数大于MapOutputStatistics
时,
spark.shuffle.minNumPartitionsToHighlyCompress
将被压缩,在这种情况下AQE可能会被破坏。