我对 Spark 很陌生,并试图了解它的内部结构。所以, 我正在从 s3 读取一个 50MB 的小 parquet 文件并执行分组,然后保存回 s3。 当我观察 Spark UI 时,我可以看到为此创建了 3 个阶段,
阶段 0:加载(1 个任务)
第 1 阶段:用于分组的 shufflequerystage(12 个任务)
第 2 阶段:保存(coalescedshufflereader)(26 个任务)
代码示例:
df = spark.read.format("parquet").load(src_loc)
df_agg = df.groupby(grp_attribute)\
.agg(F.sum("no_of_launches").alias("no_of_launchesGroup")
df_agg.write.mode("overwrite").parquet(target_loc)
我使用的 EMR 实例有 1 个主节点、3 个核心节点(每个节点有 4 个 vcore)。因此,默认并行度为 12。我不会在运行时更改任何配置。但我不明白为什么最后阶段会创建26个任务?据我了解,默认情况下,随机播放分区应为 200。附加 UI 的屏幕截图。
我在 Databricks 上使用 Spark 2.4.5 尝试了类似的逻辑。
我观察到,使用
spark.conf.set('spark.sql.adaptive.enabled', 'true')
,我的分区的最终数量是 2。
我观察到,使用
spark.conf.set('spark.sql.adaptive.enabled', 'false')
和 spark.conf.set('spark.sql.shuffle.partitions', 75)
,我的分区的最终数量是 75。
使用
print(df_agg.rdd.getNumPartitions())
揭示了这一点。
因此,Spark UI 上的作业输出并未反映这一点。可能最后会发生重新分区。有趣,但不是真正的问题。
在Spark sql中,shuffle分区的数量是使用spark.sql.shuffle.partitions设置的,默认为200。在大多数情况下,这个数字对于较小的数据来说太高,对于较大的数据来说太小。对于开发人员来说,选择正确的值总是很棘手。
因此我们需要能够通过查看映射器输出来合并洗牌分区。如果映射生成少量分区,我们希望减少整体洗牌分区,从而提高性能。
在最新版本中,Spark3.0 具有自适应查询执行,这种减少任务的功能是自动化的。 http://blog.madhukaraphatak.com/spark-aqe-part-2/
考虑到 Spark2.4.5 中的这一点,Catalist 优化器或 EMR 也可能启用此功能来减少内部任务,而不是 200 个任务。