Spark - repartition()vs coalesce()

问题描述 投票:180回答:10

根据Learning Spark的说法

请记住,重新分区数据是一项相当昂贵的操作。 Spark还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是你减少了RDD分区的数量。

我得到的一个区别是,使用repartition()可以增加/减少分区数量,但是使用coalesce()时,只能减少分区数量。

如果分区分布在多台机器上并运行coalesce(),它如何避免数据移动?

apache-spark distributed-computing rdd
10个回答
273
投票

它避免了完全洗牌。如果已知该数字正在减少,则执行程序可以安全地将数据保存在最小数量的分区上,仅将数据从额外节点移出到我们保留的节点上。

所以,它会是这样的:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后coalesce下降到2个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

请注意,节点1和节点3不需要移动其原始数据。


0
投票

但是,如果您正在处理大量数据,那么您应该确保即将合并节点的数据应该具有高度配置。因为所有数据都将加载到那些节点,可能会导致内存异常。虽然赔偿成本很高,但我更喜欢使用它。因为它平均地混洗和分发数据。

明智地选择合并和重新分配。


0
投票

对于从PySpark(AWS EMR)生成单个csv文件作为输出并将其保存在s3上的问题,使用重新分区有帮助。原因是,合并不能完全洗牌,但重新分配可以。实际上,您可以使用重新分区来增加或减少分区数,但只能使用coalesce减少分区数(但不能减少1个)。以下是试图将AWS EMR写入s3v到s3的任何人的代码:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')

126
投票

贾斯汀的答案很棒,而且这种反应更深入。

repartition算法执行完全shuffle并创建具有均匀分布的数据的新分区。让我们创建一个数字从1到12的数据框架。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf在我的机器上包含4个分区。

numbersDf.rdd.partitions.size // => 4

以下是分区上数据的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

让我们用repartition方法进行全面改组,并在两个节点上获取这些数据。

val numbersDfR = numbersDf.repartition(2)

以下是numbersDfR数据在我的机器上的分区方式:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartition方法创建新分区并在新分区中均匀分布数据(对于较大的数据集,数据分布更均匀)。

coalescerepartition之间的区别

coalesce使用现有分区来最小化被洗牌的数据量。 repartition创建了新的分区并进行了全面的随机播放。 coalesce导致具有不同数据量的分区(有时具有大小不同的分区),而repartition导致大小相等的分区。

coalescerepartition更快吗?

coalesce可能比repartition运行得更快,但不等大小的分区通常比相同大小的分区更慢。在过滤大型数据集后,您通常需要重新分区数据集。我发现repartition的整体速度更快,因为Spark可以使用相同大小的分区。

Read this blog post如果你想要更多细节。


18
投票

这里需要注意的另一点是,Spark RDD的基本原理是不变性。重新分区或合并将创建新的RDD。基本RDD将继续存在其原始分区数。如果用例要求在缓存中保留RDD,则必须对新创建的RDD执行相同操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

8
投票

所有答案都在这个经常被问到的问题中添加了一些很好的知识。

按照这个问题的时间表的传统,这是我的2美分。

在非常具体的情况下,我发现重新分区比合并更快。

在我的应用程序中,当我们估计的文件数低于特定阈值时,重新分区工作得更快。

这就是我的意思

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件小于20,则coalesce将永远完成,而重新分区要快得多,所以上面的代码。

当然,这个数字(20)将取决于工人数量和数据量。

希望有所帮助。


7
投票

repartition - 建议在增加分区数时使用重新分区,因为它涉及到所有数据的混乱。

coalesce-建议使用coalesce而不减少分区。例如,如果您有3个分区并且想要将其减少到2个分区,Coalesce会将第3个分区数据移动到分区1和2.分区1和2将保留在同一个Container中。但是重新分区将在所有分区中混洗数据以便网络使用执行者之间的关系会很高,会影响性能。

性能明智的coalesce性能优于repartition,同时减少了没有分区。


2
投票

以简单的方式COALESCE: - 仅用于减少分区的数量,没有数据的混乱它只是压缩分区

REPARTITION: - 用于增加和减少分区的数量,但是进行改组

例:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都很好

但是当我们需要在一个集群中看到输出时,我们通常会考虑这两件事,我们就这样做了。


2
投票

对于所有伟大的答案,我想补充一点,重新分区是利用数据并行化的最佳选择,并且合并提供了减少分区的廉价选项,并且在将数据写入HDFS或其他一些接收器以利用时非常有用大写。我发现这在以镶木地板格式编写数据时非常有用,可以充分利用它。


1
投票

我想补充Justin和Power的回答 -

“repartition”将忽略现有分区并创建新分区。所以你可以用它来修复数据偏斜。您可以提及分区键来定义分发。数据偏差是“大数据”问题空间中最大的问题之一。

“coalesce”将与现有分区一起使用并对其中的一部分进行洗牌。它无法像“重新分区”那样修复数据偏差。因此,即使它更便宜,也可能不是你需要的东西。


1
投票

code和代码文档得到的是coalesce(n)coalesce(n, shuffle = false)相同,repartition(n)coalesce(n, shuffle = true)相同

因此,coalescerepartition都可用于增加分区数量

使用shuffle = true,您实际上可以合并到更大数量的分区。如果您有少量分区(例如100),可能会使一些分区异常大,这很有用。

另一个要强调的重要注意事项是,如果你大幅减少分区数量,你应该考虑使用改组版本的coalesce(在这种情况下与repartition相同)。这将允许您的计算在父分区上并行执行(多个任务)。

但是,如果你正在进行激烈的合并,例如对于numPartitions = 1,这可能导致您的计算发生在比您喜欢的节点更少的节点上(例如,在numPartitions = 1的情况下,一个节点)。为避免这种情况,您可以传递shuffle = true。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。

另请参阅相关答案here

© www.soinside.com 2019 - 2024. All rights reserved.