collectAsMap在此序列中以火花形式运行map / sortByKey / collectAsMap时不保留顺序?

问题描述 投票:0回答:1

我正在尝试使用collectAsMap从rdd(元组列表)创建字典。我注意到collectAsMap没有保留排序顺序。创建字典时是否可以保留顺序?

Sample input to collectAsMap
[(1,[2,5,6]),(2,[4,8]),(3,[1]),(4,[2,6,8,9,10])]

Expected output
{1:[2,5,6],
 2:[4,8],
 3:[1],
 4:[2,6,8,9,10]}

Actual output
{1:[2,5,6],
 4:[2,6,8,9,10],
 2:[4,8],
 3:[1]
 }

python apache-spark pyspark
1个回答
0
投票

collectAsMap如何工作?

如果您查看源代码并在PairRDDFunction文件下查找collectAsMap,您将看到其实现:

def collectAsMap(): Map[K, V] = self.withScope {
    val data = self.collect()
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
}

调试上面的代码有助于我们了解重新排序的位置。假设我们运行yourSortedRdd.collectAsMap,其中yourSortedRdd[(1,[2,5,6]),(2,[4,8]),(3,[1]),(4,[2,6,8,9,10])]

def collectAsMap(): Map[K, V] = self.withScope {
    val data = self.collect() // Array[(Int,Seq[Int])] = Array((1,List(2,5,6)),(2,List(4,8)),(3,List(1)),(4,List(2,6,8,9,10)))
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) } // Array[(Int,Seq[Int])] = Array((2,List(4,8)),(4,List(2,6,8,9,10)),(1,List(2,5,6)),(3,List(1)))
    map
}

(注释表示我们为每一行获取的值)如您所见,collect保留顺序,因此foreach(是有保证的,请参见___)。但是,将新元素添加到hashMap会重新排序数据。这是因为元组将根据其哈希进行排序-从而可能会更改其位置。

解决方案

例如,我们可以重新编写此函数以使用ListHashMap来保留插入顺序:

def collectAsMap[K, V](sortedRdd:Rdd[K, V]): Map[K, V] = {
    val data = sortedRdd.collect()
    val map = new mutable.LinkedHashMap[K, V]  #<-- Preserve insertion order
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
}
© www.soinside.com 2019 - 2024. All rights reserved.