是否有方法可以使用mapPartitions而不是distinct重写Spark RDD不同?

问题描述 投票:6回答:2

我的RDD太大而无法一致地执行无假错误的独特语句(例如,SparkException阶段失败4次,ExecutorLostFailure,HDFS文件系统关闭,执行程序失败的最大次数,由于SparkContext已关闭,阶段被取消,等等。 )

例如,我正在尝试计算特定列中的不同ID:

print(myRDD.map(a => a._2._1._2).distinct.count())

是否有一种简单,一致,不需频繁改编的方法来执行上述命令,可能使用mapPartitions,reduceByKey,flatMap或其他使用了比常规更少的改编的命令?

另请参阅What are the Spark transformations that causes a Shuffle?

scala apache-spark distinct shuffle rdd
2个回答
2
投票

最好找出是否还有另一个潜在的问题,但是下面的内容可以满足您的需要……宁可四处寻找,但听起来很适合您的要求:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2) 
  .keys
  .count

甚至这似乎可行,但它并不具有关联性和可交换性。由于Spark内部的工作原理,它可以工作...但是我可能缺少一个案例...因此,尽管更简单,但我不确定我是否信任它:

myRDD.map(a => (a._2._1._2, a._2._1._2))
  .aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
  .keys.count

0
投票

据我所知,有2种可能的解决方案:

  1. 使用reduceByKey
  2. 带有mapPartitions

让我们来看一个例子。

我有一个100.000电影分级的数据集,格式为(idUser,(idMovie,rating))。假设我们想知道有多少不同的用户对电影进行了评分:

让我们首先使用distinct

val numUsers = rddSplitted.keys.distinct()
println(s"numUsers is ${numUsers.count()}")
println("*******toDebugString of rddSplitted.keys.distinct*******")
println(numUsers.toDebugString)

我们将获得以下结果:

numUsers is 943

*******toDebugString of rddSplitted.keys.distinct*******
(2) MapPartitionsRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 |  ShuffledRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:98 []
 +-(2) MapPartitionsRDD[4] at distinct at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[3] at keys at MovieSimilaritiesRicImproved.scala:98 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

使用toDebugString函数,我们可以更好地分析RDD发生了什么。

现在,让我们使用reduceByKey,例如,计算每个用户投票的次数并同时获得不同用户的数量:

val numUsers2 = rddSplitted.map(x => (x._1, 1)).reduceByKey({case (a, b) => a })
println(s"numUsers is ${numUsers2.count()}")
println("*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******")
println(numUsers2.toDebugString)

我们现在将获得这些结果:

numUsers is 943

*******toDebugString of rddSplitted.map(x => (x._1, 1)).reduceByKey(_+_)*******
(2) ShuffledRDD[4] at reduceByKey at MovieSimilaritiesRicImproved.scala:104 []
 +-(2) MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:104 []
    |  MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 []
    |  C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 []
    |  C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

分析RDD产生的结果,我们可以看到reduceByKey以比以前的[[distinct更有效的方式执行相同的操作。

最后,让我们使用

mapPartitions。主要目标是尝试首先在数据集的每个分区中区分用户,然后获得最终的不同用户。

val a1 = rddSplitted.map(x => (x._1)) println(s"Number of elements in a1: ${a1.count}") val a2 = a1.mapPartitions(x => x.toList.distinct.toIterator) println(s"Number of elements in a2: ${a2.count}") val a3 = a2.distinct() println("There are "+ a3.count()+" different users") println("*******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct *******") println(a3.toDebugString)
我们将获得以下信息:

Number of elements in a1: 100000 Number of elements in a2: 1709 There are 943 different users *******toDebugString of map(x => (x._1)).mapPartitions(x => x.toList.distinct.toIterator).distinct ******* (2) MapPartitionsRDD[7] at distinct at MovieSimilaritiesRicImproved.scala:124 [] | ShuffledRDD[6] at distinct at MovieSimilaritiesRicImproved.scala:124 [] +-(2) MapPartitionsRDD[5] at distinct at MovieSimilaritiesRicImproved.scala:124 [] | MapPartitionsRDD[4] at mapPartitions at MovieSimilaritiesRicImproved.scala:122 [] | MapPartitionsRDD[3] at map at MovieSimilaritiesRicImproved.scala:120 [] | MapPartitionsRDD[2] at map at MovieSimilaritiesRicImproved.scala:94 [] | C:/spark/ricardoExercises/ml-100k/u.data MapPartitionsRDD[1] at textFile at MovieSimilaritiesRicImproved.scala:90 [] | C:/spark/ricardoExercises/ml-100k/u.data HadoopRDD[0] at textFile at MovieSimilaritiesRicImproved.scala:90 []

我们现在可以看到,

mapPartition

首先在数据集的每个分区中获得不同数量的用户,从而将实例数从100,000缩短到1,709,而没有进行任何改组。然后,使用少得多的数据量,就可以在整个RDD上执行distinct,而不必担心混洗和更快地获得结果。我建议将最后一个建议与

mapPartitions

而不是reduceByKey一起使用,因为它管理的数据量较少。另一个解决方案可能是同时使用这两个函数,如先前所述,先使用mapPartitions,然后使用reduceByKey代替distinct,方法与之前提到的相同。
© www.soinside.com 2019 - 2024. All rights reserved.