为了了解Spark分区的工作原理,我在spark 1.6上有以下代码段
// Count size of partition for RDD[(String, Int)]
def countByPartition1(rdd: RDD[(String, Int)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
// Count size of partition for RDD[String]
def countByPartition2(rdd: RDD[String]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
// Case 1
val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa", 1)), 8)
countByPartition1(rdd1).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
// Case 2
val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
countByPartition2(rdd2).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
在这两种情况下,数据都是均匀分布的。基于上述观察,我确实有以下问题:
我也关注了@zero323's answer,但找不到上述问题的答案。
从读取文件或从驱动程序生成自己的这种initial分配中,没有应用像散列一样的实际分区程序。
如果运行val p = rdd1.partitioner
,将看到值None
。
给出V或(K,V)格式的RDD的N个值-则V或(K,V)格式实际上不相关:
然后,对于M个分区,Spark必须具有一种算法来计算将哪些数据放置在哪里,否则它将永远无法继续执行此步骤,并且我们无法继续进行下去!
然后,Spark会根据下一个整数(M / N),以相等的间隔初始放置相等数量的数据。
没有应用散列还。边缘情况是初始分配。对于驱动程序创建的RDD,这是它的工作方式,对于基于文件的源则有些不同。
[parallelize
]通过将Seq
uence划分为x
个切片,而不使用哈希分区(source)将其分布在多个分区中。