ReduceByKey函数在Spark中

问题描述 投票:1回答:3

我已经读过某些地方,对于作用于单个RDD的操作,例如reduceByKey(),在预分区的RDD上运行将导致每个键的所有值在本地计算在一台机器上,只需要最终,本地减少要从每个工作节点发送回主节点的值。这意味着我必须声明一个分区器,如:

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
             .partitionBy(new HashPartitioner(100))   // Create 100 partitions
             .persist() 

为了使reduceByKey像我之前解释的那样工作。

我的问题是,如果我想使用reduceByKey(最佳),我是否需要每次分区时声明或者不需要。

scala apache-spark rdd partitioning reduce
3个回答
3
投票

执行reduceByKey时对RDD进行分区以避免网络流量几乎不是最佳解决方案。即使它不需要为reduceByKey进行改组,它也必须对一个完整的数据集进行混洗以执行分区。

由于这通常要贵​​得多,因此预分区是没有意义的,除非您的目标是以增加总体延迟为代价来减少reduceByKey阶段的延迟,或者您可以将此分区用于其他任务。


2
投票

并不是的。 reduceByKey正在使用数据位置。来自RDD api:

/ ** *使用关联reduce函数合并每个键的值。这也将在将结果发送到reducer之前在每个mapper上执行*本地合并,类似于MapReduce中的*“combiner”。 * /

这意味着当你有一个键值RDD时,在第一阶段,使用提供的函数减少每个分区级别的相同键,然后使用相同的函数对所有已聚合的值进行shuffle和global reduce。无需提供分区程序。它只是有效。


2
投票

实际上,你所谈论的两个品质有点无关。

对于reduceByKey(),第一个质量聚合相同键的元素与提供的关联reduce函数在本地首先在每个执行器上,然后最终聚合在执行器之间。它封装在一个名为mapSideCombine的布尔参数中,如果设置为true则执行上述操作。如果设置为false,就像groupByKey()一样,每个记录将被洗牌并发送给正确的执行者。

第二个质量涉及分区及其使用方式。根据其定义,每个RDD包含分裂列表和(可选地)分区器。方法reduceByKey()重载,实际上有一些定义。例如:

  • def reduceByKey(func: (V, V) => V): RDD[(K, V)] 该方法的这个定义实际上使用来自父RDD的默认现有分区器,并减少到设置为默认并行度级别的分区数。
  • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] 该方法的定义将使用HashPartitioner将相应的数据传输到相应的执行器,并且分区的数量将为numPartitions
  • def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] 最后,该方法的这个定义取代了另外两个,并且接受了一个通用(可能是自定义)分区器,它将产生由分区器如何分区键确定的分区数。

关键在于您可以在reduceByKey()本身内实际编码所需的分区逻辑。如果你的意图是通过预分区来避免改变开销,那么它也没有意义,因为你仍然会在预分区上进行洗牌。

© www.soinside.com 2019 - 2024. All rights reserved.