我想执行自连接,以生成候选匹配对。当前,此操作不起作用,因为此操作太慢。不幸的是,由于数据帧太大,我无法广播。
首先,我汇总元组的数量以减少数据:
val aggregated = df.groupBy("discrete_foo", "discrete_bar").agg(sum("value"))
aggregated.repartition(7, "discrete_foo", "discrete_bar").sortWithinPartitions("discreate_foo", "discrete_bar, "baz").write.parquet.option("compression", "gzip").mode("overwrite")
这很好,而且很快。然后,我想执行自我联接以生成候选对象。我已经观察到需要产生更多的并行性:
--conf spark.sql.shuffle.partitions=4000 \
--conf spark.default.parallelism=4000 \
因此,设置了更高的默认值和随机并行度。另外,我尝试粗化两个离散值(即增加落入离散块的项目数),从而减少元组的数量。仍然没有运气。因此,我还尝试通过重新分区来强制执行大量任务:
val materializedAggregated= spark.read.parquet(s"path/to/file/aggregated_stuff" )
.repartition(4000)
val selfB = materializedAggregated
.withColumnRenamed("baz", "other_batz")
.withColumnRenamed("value", "other_value")
val candidates = materializedMultiSTW
.join(selfB, Seq("discrete_foo", "discrete_bar"), "inner")
.filter(col(FeedIdentifierColumns.imsiHash) =!= col("other_imsi_hash"))
但是这也不起作用,而且速度太慢。我还能做些什么使该查询计算更快?有什么我想念的吗?
下面,您将在读取自连接数据时尝试增加并行度的尝试失败。
我什至设置:
--conf spark.sql.files.maxPartitionBytes=16777216 \
到1/8,即16个vs. 128MB,仍然生成的任务数量太少,即只有250个。
执行计划:
即使没有此手动重新分区,它也太慢了,我担心创建的分区不足:
处理更少的任务-最有可能使它变慢:
我如何确保此初始步骤具有更高的并行度?铲斗有帮助吗?但是,当只读取经过改组的数据一次时-并不能真正提高速度-对吗?写入聚合文件时的重新分区步骤如何?我应该在这里设置一个更大的数字吗?到目前为止,即使省略它(并且基本上重新计算聚合两次),它也不会增加超过260个任务。
我在HDP 3.1上使用spark 2.3.x
内部联接的最大任务数将等于联接键的数目(即其基数),而与spark.sql.shuffle.partitions
和spark.default.parallelism
的设置无关。
这是因为在SortMergeJoin中,将使用连接键的哈希值对数据进行混洗。来自每个不同的联接键的所有数据将转到单个执行器。
因此,问题是您没有足够的垃圾箱-它们太粗糙了。您将看到的最大任务数将等于垃圾箱数。
如果您对数据进行更精细的分类,您应该会看到任务数量增加。