使用 scala 计算重复单词的共现情况

问题描述 投票:0回答:1

我对 scala 非常陌生,现在我正在努力计算 Dstream 的每个 RDD 的单词共现频率。 到目前为止,当我没有重复的单词时,我的代码可以正常工作,但是当我这样做时,它会跳过它们,当我删除过滤器功能时,它会计算每个单词本身......

任何帮助将不胜感激。谢谢!

这是我的输入:

像猪一样蜂巢

这是我带有过滤器的代码:

 val coOccurrenceCountsB = rdd.flatMap { line =>
          val words = line.split("\\s+").filter(word => word.matches("[a-zA-Z]+") && word.length >= 3)
          words.flatMap { word1 =>
            words.filter(word2 => word2 != word1).map(word2 => ((word1, word2), 1))
          }

      }.reduceByKey(_ + _)

和我的输出:

((猪,蜂巢),1) ((蜂巢,类似),2) ((如蜂巢),2) ((猪,像),2) ((蜂巢,猪),1) ((如猪),2)

我没有过滤器的输出:

((猪,蜂巢),1) ((猪,猪),1) ((蜂巢,类似),2) ((如蜂巢),2) ((猪,像),2) ((蜂巢,猪),1) ((喜欢,喜欢),4) ((如猪),2) ((蜂巢,蜂巢),1)

我想要什么:

((猪,蜂巢),1) ((蜂巢,类似),2) ((如蜂巢),2) ((猪,像),2) ((蜂巢,猪),1) ((如猪),2) ((喜欢,喜欢),2)

scala apache-spark hadoop
1个回答
0
投票

可以应用按位置过滤而不是按值过滤,使用

zipWithIndex
:

val words = line.split("\\s+").filter(word => word.matches("[a-zA-Z]+") && word.length >= 3)
val indexed = words.zipWithIndex
val result = indexed.flatMap { case (word1, index1) =>
  indexed
    .filter { case (_, index2) => index1 != index2 }
    .map { case (word2, _) => ((word1, word2), 1) }
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.