TL; DR:我有一个大文件,我对其进行了三遍迭代,以得出三组不同的计数。有没有一种方法可以一次通过数据获取三张地图?
更多详细信息:
我正在尝试计算大型文件中列出的单词和特征之间的PMI。我的管道看起来像这样:
val wordFeatureCounts = sc.textFile(inputFile).flatMap(line => {
val word = getWordFromLine(line)
val features = getFeaturesFromLine(line)
for (feature <- features) yield ((word, feature), 1)
})
然后我重复此操作以分别获得字数和特征数:
val wordCounts = sc.textFile(inputFile).flatMap(line => {
val word = getWordFromLine(line)
val features = getFeaturesFromLine(line)
for (feature <- features) yield (word, 1)
})
val featureCounts = sc.textFile(inputFile).flatMap(line => {
val word = getWordFromLine(line)
val features = getFeaturesFromLine(line)
for (feature <- features) yield (feature, 1)
})
((我意识到我可以遍历wordFeatureCounts
以获得wordCounts
和featureCounts
,但这并不能回答我的问题,在实践中查看运行时间,我不确定这样做的速度实际上更快还请注意,在计算完计数后,还有一些reduceByKey操作和其他我要处理的东西未显示,因为它们与问题无关。]
我真正想做的是这样的:
val (wordFeatureCounts, wordCounts, featureCounts) = sc.textFile(inputFile).flatMap(line => {
val word = getWordFromLine(line)
val features = getFeaturesFromLine(line)
val wfCounts = for (feature <- features) yield ((word, feature), 1)
val wCounts = for (feature <- features) yield (word, 1)
val fCounts = for (feature <- features) yield (feature, 1)
??.setOutput1(wfCounts)
??.setOutput2(wCounts)
??.setOutput3(fCounts)
})
有没有办法用spark做到这一点?在寻找方法时,在将结果保存到磁盘时我遇到了关于多个输出的问题(无用),并且我对累加器也有所了解(看起来不像我所需要的) ,仅此而已。
还要注意,我不能只将所有这些结果汇总在一个大列表中,因为我需要三张单独的地图。如果有一种有效的方法可以在事后拆分组合的RDD,那可能行得通,但我想到的唯一方法是最终对数据进行四次迭代,而不是我当前进行的三遍(一次创建)组合的地图,然后进行三遍将其过滤到我实际想要的地图中。)
无法将一个RDD拆分为多个RDD。如果您考虑一下这将如何工作,这是可以理解的。假设您将RDD x = sc.textFile("x")
分为a = x.filter(_.head == 'A')
和b = x.filter(_.head == 'B')
。到目前为止,什么都没有发生,因为RDD很懒。但是现在您打印a.count
。因此,Spark打开文件,并在各行中进行迭代。如果该行以A
开头,则对其进行计数。但是,如何处理以B
开头的行呢?将来会打给b.count
吗?也许是b.saveAsTextFile("b")
,我们应该将这些行写在某个地方吗?我们目前不知道。使用Spark API不可能拆分RDD。
但是,只要您知道自己想要什么,什么都不会阻止您实施某些东西。如果要同时获取a.count
和b.count
,则可以将以A
开头的行映射到(1, 0)
,将以B
开头的行映射到(0, 1)
,然后以约数的方式对元组求和。如果要将B
中的行保存到文件中,而用A
进行行计数,则可以在map
之前的filter(_.head == 'B').saveAsTextFile
中使用聚合器。
唯一的通用解决方案是将中间数据存储在某个地方。一种选择是仅缓存输入(x.cache
)。另一个方法是一次将内容写到单独的目录中,然后将它们读回为单独的RDD。 (请参阅Write to multiple outputs by key Spark - one Spark job。)我们在生产中执行此操作,效果很好。
这是Spark相对于传统的map-reduce编程的主要缺点之一。可以将一个RDD / DF / DS转换为另一个RDD / DF / DS,但是您不能将一个RDD映射到多个输出中。为了避免重新计算,您需要将结果缓存到某个中间RDD中,然后运行多个映射操作以生成多个输出。但是,如果数据很大,则中间输出将溢出到磁盘上,并且缓存的优势不会那么大。在此处查看讨论-https://issues.apache.org/jira/browse/SPARK-1476。这是一个古老的吉拉,但很重要。请查看Mridul Muralidharan的评论。