提高读取实木复合地板文件的并行度-Spark优化自连接

问题描述 投票:2回答:1

我想执行自连接,以生成候选匹配对。当前,此操作不起作用,因为此操作太慢。不幸的是,由于数据帧太大,我无法广播。

首先,我汇总元组的数量以减少数据:

val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")

这很好,而且很快。然后,我想执行自我联接以生成候选对象。我已经观察到需要产生更多的并行性:

--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \ 

因此,设置了更高的默认值和随机并行度。另外,我尝试粗化两个离散值(即增加落入离散块的项目数),从而减少元组的数量。仍然没有运气。因此,我还尝试通过重新分区来强制执行大量任务:

val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff"      )
  .repartition(4000)
val selfB = materializedAggregated
  .withColumnRenamed("baz", "other_batz")
  .withColumnRenamed("value", "other_value")

val candidates = materializedMultiSTW
  .join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
  .filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))

但是这也不起作用,而且速度太慢。我还能做些什么使该查询计算更快?有什么我想念的吗?

下面,您将在读取自连接数据时尝试增加并行度的尝试失败。

我什至设置:

--conf spark.sql.files.maxPartitionBytes=16777216 \

到1/8,即16个vs. 128MB,仍然生成的任务数量太少,即只有250个。

一些细节

执行计划:

enter image description here

即使没有此手动重新分区,它也太慢了,我担心创建的分区不足:

enter image description here

处理更少的任务-最有可能使它变慢:

enter image description here

我如何确保此初始步骤具有更高的并行度?铲斗有帮助吗?但是,当只读取经过改组的数据一次时-并不能真正提高速度-对吗?写入聚合文件时的重新分区步骤如何?我应该在这里设置一个更大的数字吗?到目前为止,即使省略它(并且基本上重新计算聚合两次),它也不会增加超过260个任务。

环境

我在HDP 3.1上使用spark 2.3.x

apache-spark optimization apache-spark-sql self-join
1个回答
1
投票

内部联接的最大任务数将等于联接键的数目(即其基数),而与spark.sql.shuffle.partitionsspark.default.parallelism的设置无关。

这是因为在SortMergeJoin中,将使用连接键的哈希值对数据进行混洗。来自每个不同的联接键的所有数据将转到单个执行器。

因此,问题是您没有足够的垃圾箱-它们太粗糙了。您将看到的最大任务数将等于垃圾箱数。

如果您对数据进行更精细的分类,您应该会看到任务数量增加。

© www.soinside.com 2019 - 2024. All rights reserved.