为什么在reduceByKey之后所有数据最终都集中在一个分区中?

问题描述 投票:5回答:2

我有这个简单的spark程序。我想知道为什么所有数据最终都集中在一个分区中。

val l = List((30002,30000), (50006,50000), (80006,80000), 
             (4,0), (60012,60000), (70006,70000), 
             (40006,40000), (30012,30000), (30000,30000),
             (60018,60000), (30020,30000), (20010,20000), 
             (20014,20000), (90008,90000), (14,0), (90012,90000),
             (50010,50000), (100008,100000), (80012,80000),
             (20000,20000), (30010,30000), (20012,20000), 
             (90016,90000), (18,0), (12,0), (70016,70000), 
             (20,0), (80020,80000), (100016,100000), (70014,70000),
             (60002,60000), (40000,40000), (60006,60000), 
             (80000,80000), (50008,50000), (60008,60000), 
             (10002,10000), (30014,30000), (70002,70000),
             (40010,40000), (100010,100000), (40002,40000),
             (20004,20000), 
             (10018,10000), (50018,50000), (70004,70000),
             (90004,90000), (100004,100000), (20016,20000))

val l_rdd = sc.parallelize(l, 2)

// print each item and index of the partition it belongs to
l_rdd.mapPartitionsWithIndex((index, iter) => {
   iter.toList.map(x => (index, x)).iterator
}).collect.foreach(println)

// reduce on the second element of the list.
// alternatively you can use aggregateByKey  
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

// print the reduced results along with its partition index
l_reduced.mapPartitionsWithIndex((index, iter) => {
      iter.toList.map(x => (index, x._1, x._2.size)).iterator
}).collect.foreach(println)

运行时,您会看到数据(l_rdd)被分配到两个分区中。一旦减少,结果RDD(l_reduced)也有两个分区,但是所有数据都在一个分区(索引0)中,而另一个则为空。即使数据很大(几GB),也会发生这种情况。 l_reduced也不应分配到两个分区中。

apache-spark rdd
2个回答
4
投票
val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a})

参考上面的代码段,您正在按RDD的第二个字段进行分区。第二个字段中的所有数字都以0结尾。

当您调用HashPartitioner时,记录的分区号由以下function决定:

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

并且Utils.nonNegativeMod被定义为follows

 def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }

让我们看看将以上两个逻辑应用于您的输入时会发生什么:

scala> l.map(_._2.hashCode % 2) // numPartitions = 2
res10: List[Int] = List(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)

因此,所有记录都位于分区0中。

您可以通过重新分区解决此问题:

val l_reduced = l_rdd.map(x => {
                    (x._2, List(x._1))
                  }).reduceByKey((a, b) => {b ::: a}).repartition(2)

给出:

(0,100000,4)
(0,10000,2)
(0,0,5)
(0,20000,6)
(0,60000,5)
(0,80000,4)
(1,50000,4)
(1,30000,6)
(1,90000,4)
(1,70000,5)
(1,40000,4)

或者,您可以创建一个custom partitioner


0
投票

除非您另外指定,否则将根据相关键的哈希码进行分区,并假定哈希码将导致相对均匀的分布。在这种情况下,您的哈希码都是偶数,因此都将进入分区0。

如果这确实代表您的数据集,则reduceByKey会有一个重载,它占用了分区程序以及reduce函数。我建议为这样的数据集提供替代的分区算法。

© www.soinside.com 2019 - 2024. All rights reserved.