Spark AQE Post-Shuffle 分区合并无法按预期工作,甚至会导致某些分区中的数据倾斜。为什么?

问题描述 投票:0回答:2

我在 Spark DF 上使用全局排序,当我启用 AQE 和后洗牌合并时,排序操作后的分区分布比以前更差。

    "spark.sql.adaptive.enabled" -> "true",
    "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
    "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
    "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"

我的查询在高层次上看起来:

.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
  1. 可能导致倾斜的列 -> 是的,我的数据分布不佳,这就是我使用盐的原因。
  2. 我从Kafka读取数据,所以我使用Kafka分区+偏移列作为salt。
  3. 为什么在底层使用 reaprtitoinByRange 的 Sort 对我没有帮助,而我想启用 AQE? -> 现在我发现我的 Kafka 消息的大小差异太大。所以我看到范围重新分区后的分区具有几乎相同数量的记录,但字节数仍然非常不均匀。
  4. 为什么我认为 AQE 必须帮助我? -> 我想创建许多小范围,即使我的数据倾斜也不会超过〜50mb,因此后洗牌合并将能够将它们合并到目标大小(256mb)。就我而言,峰值最大 320mb 就可以了。

我的第一个假设是,即使范围很小,峰值也会太大。 但我检查并确认按范围重新分区可以使我在记录中得到良好的分布,但大小不好。我有近 200 个分区,它们的记录量几乎相同,但大小差异高达 9 倍,从 ~100Mb 到 ~900mb。 但通过 AEQ 并重新分区为 18000 个小范围,最小分区为 18mib,最大分区为 1.8Gib。 这种情况比没有 AEQ 的情况要糟糕得多。 需要强调的是,我使用 Spark UI -> Stage 选项卡的详细信息中的指标来识别分区大小(以字节为单位),并且我有自己的记录日志。

所以我开始调试这个问题,但是AQE没有足够的输入和输出日志

ShufflePartitionsUtil.coalescePartitions
。 这就是为什么我将查询重写为 repartitionByRange.sortWithingPartitoins。以及通过额外的日志记录来优化物理计划。 我的日志告诉我,我最初的想法是正确的。

  • map 和 write shuffle 阶段之后的输入分区被分割得足够小
  • 合并算法将它们收集到正确的数量,均匀分布在字节分区中。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435

还有

Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition  maxsize :312832323
Output partition min size :103832323

最小大小如此不同,因为最后一个分区的大小是预期的。 TRACE 日志级别显示 99% 的分区接近 290mib。

  • 但是为什么spark UI显示出如此不同的结果呢? ->

  • spark UI 可能是错误的吗? ->

  • 也许吧,但是除了任务大小之外,任务的持续时间也太大了,这让我觉得spark UI还可以。

  • 所以我的假设是问题出在我所在阶段的

    MapOutputStatistics
    。但它总是坏掉还是只在我的情况下坏掉? ->

  • 只有我的情况吗? -> 我做了一些检查来确认。

    • 我从 s3(块大小为 120mb 的镶木地板文件)读取相同的数据集->并且 AQE 按预期工作。洗牌后合并返回给我 188,按大小和分区均匀分布。值得注意的是,s3 上的数据分布不均匀,但 Spark 在读取过程中将其分割为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
    • 我从 Kafka 读取了相同的数据集,但从分区函数中排除了有倾斜的列 -> 并且 AQE 按预期工作。洗牌后合并返回给我 203,按大小、分区良好分布。
    • 我尝试禁用缓存 -> 这没有任何结果。我使用缓存,只是为了避免从kafka重复读取。因为按范围重新分区使用采样。
    • 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果符合预期,与我的合并输入日志显示的结果相同:17999 个文件,最小的接近 8mib,最大的 56mib。
  • 所有这些检查让我认为

    MapOutputStatistics
    仅对我的情况是错误的。可能是如何与 Kafka 源关联的问题,或者我的 Kafka 输入数据分布非常不均匀。

问题:

  • 那么有人知道我做错了什么吗?
  • 我可以如何处理输入数据以使后洗牌合并在我的情况下起作用?
  • 如果您认为我说得对,请发表评论。

附注 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 某些分区可能比其他分区大 2 倍。从具有 720 个分区和

minPartitions
选项 * 3 的 Kafka 主题读取。

apache-spark apache-spark-sql spark-kafka-integration spark3
2个回答
1
投票

https://www.mail-archive.com/[电子邮件受保护]/msg26851.html

这就是答案。

在缓存数据中启用 AQE 最坏的情况是不丢失 使用/重用缓存的机会,而只是额外的洗牌,如果 没有 AQE 时,outputPartitioning 恰好匹配,但之后不匹配 阿QE。这种情况发生的几率相当低。


0
投票
  private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
    ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
      .internal()
      .doc("Number of partitions to determine if MapStatus should use HighlyCompressedMapStatus")
      .version("2.4.0")
      .intConf
      .checkValue(v => v > 0, "The value should be a positive integer.")
      .createWithDefault(2000)
当您的分区数大于

MapOutputStatistics

时,
spark.shuffle.minNumPartitionsToHighlyCompress
将被压缩,在这种情况下AQE可能会被破坏。

© www.soinside.com 2019 - 2024. All rights reserved.