为什么我的group by操作时shuffle分区不是200(默认)? (火花2.4.5)

问题描述 投票:0回答:2

我对 Spark 很陌生,并试图了解它的内部结构。所以, 我正在从 s3 读取一个 50MB 的小 parquet 文件并执行分组,然后保存回 s3。 当我观察 Spark UI 时,我可以看到为此创建了 3 个阶段,

阶段 0:加载(1 个任务)

第 1 阶段:用于分组的 shufflequerystage(12 个任务)

第 2 阶段:保存(coalescedshufflereader)(26 个任务)

代码示例:

df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\                             
 .agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)

我使用的 EMR 实例有 1 个主节点、3 个核心节点(每个节点有 4 个 vcore)。因此,默认并行度为 12。我不会在运行时更改任何配置。但我不明白为什么最后阶段会创建26个任务?据我了解,默认情况下,随机播放分区应为 200。附加 UI 的屏幕截图。

apache-spark pyspark apache-spark-sql amazon-emr
2个回答
4
投票

我在 Databricks 上使用 Spark 2.4.5 尝试了类似的逻辑。

我观察到,使用

spark.conf.set('spark.sql.adaptive.enabled', 'true')
,我的分区的最终数量是 2。

我观察到,使用

spark.conf.set('spark.sql.adaptive.enabled', 'false')
spark.conf.set('spark.sql.shuffle.partitions', 75)
,我的分区的最终数量是 75。

使用

print(df_agg.rdd.getNumPartitions())
揭示了这一点。

因此,Spark UI 上的作业输出并未反映这一点。可能最后会发生重新分区。有趣,但不是真正的问题。


2
投票

在Spark sql中,shuffle分区的数量是使用spark.sql.shuffle.partitions设置的,默认为200。在大多数情况下,这个数字对于较小的数据来说太高,对于较大的数据来说太小。对于开发人员来说,选择正确的值总是很棘手。

因此我们需要能够通过查看映射器输出来合并洗牌分区。如果映射生成少量分区,我们希望减少整体洗牌分区,从而提高性能。

在最新版本中,Spark3.0 具有自适应查询执行,这种减少任务的功能是自动化的。 http://blog.madhukaraphatak.com/spark-aqe-part-2/

考虑到 Spark2.4.5 中的这一点,Catalist 优化器或 EMR 也可能启用此功能来减少内部任务,而不是 200 个任务。

© www.soinside.com 2019 - 2024. All rights reserved.