Spark如何使用哈希分区程序分发数据?

问题描述 投票:0回答:2

为了了解Spark分区的工作原理,我在spark 1.6上有以下代码段

// Count size of partition for RDD[(String, Int)]
def countByPartition1(rdd: RDD[(String, Int)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

// Count size of partition for RDD[String]
def countByPartition2(rdd: RDD[String]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

// Case 1
val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa", 1)), 8)
countByPartition1(rdd1).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)

// Case 2
val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
countByPartition2(rdd2).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)

在这两种情况下,数据都是均匀分布的。基于上述观察,我确实有以下问题:

  1. 在rdd1的情况下,哈希分区应该计算键的哈希码(在这种情况下,即“ aa”,因此所有记录都应该转到单个分区而不是统一分配?
  2. 在rdd2的情况下,没有键值对,因此哈希分区如何起作用,即计算哈希码的关键是什么?

我也关注了@zero323's answer,但找不到上述问题的答案。

apache-spark hash rdd partition
2个回答
0
投票

从读取文件或从驱动程序生成自己的这种initial分配中,没有应用像散列一样的实际分区程序。

如果运行val p = rdd1.partitioner,将看到值None

  • 给出V或(K,V)格式的RDD的N个值-则V或(K,V)格式实际上不相关:

    • 然后,对于M个分区,Spark必须具有一种算法来计算将哪些数据放置在哪里,否则它将永远无法继续执行此步骤,并且我们无法继续进行下去!

      • 然后,Spark会根据下一个整数(M / N),以相等的间隔初始放置相等数量的数据。

        • 因此,如果我有4个带有10个分区的值,则(10 / 2.5)个步骤的下一个较高整数将Spark放入数据。那就是你所看到的。如您所见,同样适用于具有8个分区的4个值。

没有应用散列。边缘情况是初始分配。对于驱动程序创建的RDD,这是它的工作方式,对于基于文件的源则有些不同。


0
投票

[parallelize]通过将Seq uence划分为x个切片,而不使用哈希分区(source)将其分布在多个分区中。

© www.soinside.com 2019 - 2024. All rights reserved.