何时使用countByValue以及何时使用map()。reduceByKey()

问题描述 投票:0回答:3

我是Spark和scala的新手,并且正在研究一个简单的wordCount示例。

所以为此我使用countByValue如下:

val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCount = words.countByValue();

哪个工作正常。

同样的事情可以实现如下:

val words = lines.flatMap(x => x.split("\\W+")).map(x => x.toLowerCase())
val wordCounts = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
val sortedWords = wordCounts.map(x => (x._2, x._1)).sortByKey()

这也很好。

现在,我的问题是何时使用哪种方法?哪一个优于另一个?

scala apache-spark rdd word-count
3个回答
4
投票

这里的例子 - 不是单词,而是数字:

val n = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
val n2 = n.countByValue

返回本地地图:

n: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at command-3737881976236428:1
n2: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)

这是关键的区别。

如果你想要一个开箱即用的地图,那么这就是你要走的路。

此外,重点是暗示降低并且不受影响,也不需要像reduceByKey那样提供。

当数据大小很大时,reduceByKey具有首选项。地图完整地加载到驱动程序内存中。


4
投票

至少在PySpark中,它们是不同的东西。

countByKey是用reduce实现的,这意味着驱动程序将收集分区的部分结果并自行合并。如果你的结果很大,那么驱动程序将必须合并大量的大词典,这将使驱动程序变得疯狂。

reduceByKey将关键字改组为不同的执行者,并对每个工作者进行减少,因此如果数据量很大则更有利。

总而言之,当您的数据很大时,使用mapreduceByKeycollect将使您的驾驶员更加快乐。如果您的数据很小,countByKey将减少网络流量(减少一个阶段)。


1
投票

加上以上所有答案,这是我进一步发现的:

  1. CountByValue返回一个不能以分布式方式使用的映射。
  2. ReduceByKey返回一个rdd,可以进一步以分布式方式使用。
© www.soinside.com 2019 - 2024. All rights reserved.