如何避免在加入pyspark的操作中过度洗牌?

问题描述 投票:1回答:1

我有一个大的spark数据框,大小约为25GB,我必须加入另一个大小约为15GB的数据框。

现在当我运行这段代码时,它需要大约15分钟才能完成。

资源分配为40个执行器,每个执行器128GB内存。

当我通过其执行计划时,正在进行排序合并加入。

问题是:在同一个键但不同的表上,连接被执行了大约5到6次。

在同一键但不同的表上进行了5到6次的连接 因为这样,在每次连接时,在合并连接之前,大部分时间都在对数据进行排序和分区。

所以,有没有什么方法可以在执行join之前对数据进行排序,这样就不需要为每次join执行排序操作,或者以这样的方式进行优化,从而减少排序的时间,增加实际join数据的时间?

我只是想在执行join之前对我的数据框架进行排序,但是不知道怎么做?

例如:如果我的数据框在加入数据之前,先对数据进行排序,但不知道该怎么做?

如果我的数据框架是在id列上进行连接的

joined_df = df1.join(df2,df1.id==df2.id)

在加入之前,我怎样才能根据'id'对数据框进行排序,以便分区共同定位?

python apache-spark pyspark apache-spark-sql
1个回答
2
投票

那么,有没有什么方法可以在执行连接之前对数据进行排序,这样就不会在每次连接时进行排序操作,或者以这样的方式进行优化,使其花费更少的时间进行排序,而花费更多的时间实际连接数据?

这闻起来像桶式。

桶式 是一种优化技术,使用桶(和桶列)来确定数据分区,避免数据洗牌。

这个想法是为了 bucketBy 的数据集,这样Spark就知道键是联合定位的(已经预洗牌了)。各个参与join的DataFrames的bucket和bucketing列的数量必须是相同的。

请注意,这是对Hive或Spark表的支持(saveAsTable),因为bucket元数据是从元存储(Spark的或Hive的)中获取的。


1
投票

在过去,我通过按join列对输入数据框进行重新分区,取得了不错的效果。这使得Spark能够执行本地联接,最大限度地减少洗牌,比如说

joined_df = df1.repartition(num_partitions,'id').join(df2.repartition(num_partitions, 'id'),on=['id'])

读取后立即重新分区,以利用分区局部操作,也建议在 这个 回答:

© www.soinside.com 2019 - 2024. All rights reserved.