我对 scala 非常陌生,现在我正在努力计算 Dstream 的每个 RDD 的单词共现频率。 到目前为止,当我没有重复的单词时,我的代码可以正常工作,但是当我这样做时,它会跳过它们,当我删除过滤器功能时,它会计算每个单词本身......
任何帮助将不胜感激。谢谢!
这是我的输入:
像猪一样蜂巢
这是我带有过滤器的代码:
val coOccurrenceCountsB = rdd.flatMap { line =>
val words = line.split("\\s+").filter(word => word.matches("[a-zA-Z]+") && word.length >= 3)
words.flatMap { word1 =>
words.filter(word2 => word2 != word1).map(word2 => ((word1, word2), 1))
}
}.reduceByKey(_ + _)
和我的输出:
((猪,蜂巢),1) ((蜂巢,类似),2) ((如蜂巢),2) ((猪,像),2) ((蜂巢,猪),1) ((如猪),2)
我没有过滤器的输出:
((猪,蜂巢),1) ((猪,猪),1) ((蜂巢,类似),2) ((如蜂巢),2) ((猪,像),2) ((蜂巢,猪),1) ((喜欢,喜欢),4) ((如猪),2) ((蜂巢,蜂巢),1)
我想要什么:
((猪,蜂巢),1) ((蜂巢,类似),2) ((如蜂巢),2) ((猪,像),2) ((蜂巢,猪),1) ((如猪),2) ((喜欢,喜欢),2)
可以应用按位置过滤而不是按值过滤,使用
zipWithIndex
:
val words = line.split("\\s+").filter(word => word.matches("[a-zA-Z]+") && word.length >= 3)
val indexed = words.zipWithIndex
val result = indexed.flatMap { case (word1, index1) =>
indexed
.filter { case (_, index2) => index1 != index2 }
.map { case (word2, _) => ((word1, word2), 1) }
}