Apache Spark:一个地图任务中有多个输出

问题描述 投票:2回答:2

TL; DR:我有一个大文件,我对其进行了三遍迭代,以得出三组不同的计数。有没有一种方法可以一次通过数据获取三张地图?

更多详细信息:

我正在尝试计算大型文件中列出的单词和特征之间的PMI。我的管道看起来像这样:

val wordFeatureCounts = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  for (feature <- features) yield ((word, feature), 1)
})

然后我重复此操作以分别获得字数和特征数:

val wordCounts = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  for (feature <- features) yield (word, 1)
})

val featureCounts = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  for (feature <- features) yield (feature, 1)
})

((我意识到我可以遍历wordFeatureCounts以获得wordCountsfeatureCounts,但这并不能回答我的问题,在实践中查看运行时间,我不确定这样做的速度实际上更快还请注意,在计算完计数后,还有一些reduceByKey操作和其他我要处理的东西未显示,因为它们与问题无关。]

我真正想做的是这样的:

val (wordFeatureCounts, wordCounts, featureCounts) = sc.textFile(inputFile).flatMap(line => {
  val word = getWordFromLine(line)
  val features = getFeaturesFromLine(line)
  val wfCounts = for (feature <- features) yield ((word, feature), 1)
  val wCounts = for (feature <- features) yield (word, 1)
  val fCounts = for (feature <- features) yield (feature, 1)
  ??.setOutput1(wfCounts)
  ??.setOutput2(wCounts)
  ??.setOutput3(fCounts)
})

有没有办法用spark做到这一点?在寻找方法时,在将结果保存到磁盘时我遇到了关于多个输出的问题(无用),并且我对累加器也有所了解(看起来不像我所需要的) ,仅此而已。

还要注意,我不能只将所有这些结果汇总在一个大列表中,因为我需要三张单独的地图。如果有一种有效的方法可以在事后拆分组合的RDD,那可能行得通,但我想到的唯一方法是最终对数据进行四次迭代,而不是我当前进行的三遍(一次创建)组合的地图,然后进行三遍将其过滤到我实际想要的地图中。)

scala apache-spark
2个回答
0
投票

无法将一个RDD拆分为多个RDD。如果您考虑一下这将如何工作,这是可以理解的。假设您将RDD x = sc.textFile("x")分为a = x.filter(_.head == 'A')b = x.filter(_.head == 'B')。到目前为止,什么都没有发生,因为RDD很懒。但是现在您打印a.count。因此,Spark打开文件,并在各行中进行迭代。如果该行以A开头,则对其进行计数。但是,如何处理以B开头的行呢?将来会打给b.count吗?也许是b.saveAsTextFile("b"),我们应该将这些行写在某个地方吗?我们目前不知道。使用Spark API不可能拆分RDD。

但是,只要您知道自己想要什么,什么都不会阻止您实施某些东西。如果要同时获取a.countb.count,则可以将以A开头的行映射到(1, 0),将以B开头的行映射到(0, 1),然后以约数的方式对元组求和。如果要将B中的行保存到文件中,而用A进行行计数,则可以在map之前的filter(_.head == 'B').saveAsTextFile中使用聚合器。

唯一的通用解决方案是将中间数据存储在某个地方。一种选择是仅缓存输入(x.cache)。另一个方法是一次将内容写到单独的目录中,然后将它们读回为单独的RDD。 (请参阅Write to multiple outputs by key Spark - one Spark job。)我们在生产中执行此操作,效果很好。


0
投票

这是Spark相对于传统的map-reduce编程的主要缺点之一。可以将一个RDD / DF / DS转换为另一个RDD / DF / DS,但是您不能将一个RDD映射到多个输出中。为了避免重新计算,您需要将结果缓存到某个中间RDD中,然后运行多个映射操作以生成多个输出。但是,如果数据很大,则中间输出将溢出到磁盘上,并且缓存的优势不会那么大。在此处查看讨论-https://issues.apache.org/jira/browse/SPARK-1476。这是一个古老的吉拉,但很重要。请查看Mridul Muralidharan的评论。

© www.soinside.com 2019 - 2024. All rights reserved.