我的目标是读取2个文件,过滤掉停用词,找到常用词并选择在这两个文件之间较小的词数。之后,我应该按降序对键值对进行排序,并仅显示单词的前15个。因此,我决定这样做:
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("common words")
val sc = new SparkContext(conf)
val stopwords_file = sc.textFile("files/stopwords.txt")
val file = sc.textFile("files/task1-input1.txt")
val file2 = sc.textFile("files/task1-input2.txt")
val stopwords = stopwords_file.collect()
val counts1 = file.flatMap(line => line.split(" "))
.map(word => word.toLowerCase())
.filter(!stopwords.contains(_))
.map(word => (word, 1))
.reduceByKey(_ + _)
val counts2 = file2.flatMap(line => line.split(" "))
.map(word => word.toLowerCase())
.filter(!stopwords.contains(_))
.map(word => (word, 1))
.reduceByKey(_ + _)
.join(counts1)
.collect()
.map{ //this line gives the error
case (k, (v1,v2)) if v1 < v2 => (k,v1) // check if count on file1 is smaller then use file1 count for that word, else use file2 count
}
val result = sc.parallelize(counts2).sortBy(_._2, false).take(15)
result.foreach(println)
这给我这个错误:
Exception in thread "main" scala.MatchError: (http://www.gutenberg.org,(1,1)) (of class scala.Tuple2) at line 30 (the map by case line)
我不太了解该错误,对于其他方法的任何帮助或建议,我们将不胜感激。我是Spark的新手,在hadoop中,我会将每个文件映射到不同的键值对,然后将它们作为reduce函数的输入,并比较reduce函数中的值,但是我不确定在spark中应该做什么。谢谢。
防护表达式使您的模式匹配不够详尽。考虑:
val f: PartialFunction[(String, (Int, Int)), (String, Int)] = {
case (k, (v1,v2)) if v1 < v2 => (k,v1)
}
f.isDefinedAt(("foo", (0, 1)))
// Boolean = true
vs。
f.isDefinedAt(("foo", (1, 0)))
// Boolean = false
您应该删除防护表达式:
val g: PartialFunction[(String, (Int, Int)), (String, Int)] = {
case (k, (v1,v2)) => if(v1 < v2) (k,v1) else (k, v2)
}
g.isDefinedAt(("foo", (1, 0)))
// Boolean = true
g.isDefinedAt(("foo", (0, 1)))
// Boolean = true
或提供默认值:
val h: PartialFunction[(String, (Int, Int)), (String, Int)] = {
case (k, (v1,v2)) if v1 < v2 => (k, v1)
case (k, (_, v2)) => (k, v2)
}
h.isDefinedAt(("foo", (0, 1)))
// Boolean = true
h.isDefinedAt(("foo", (1, 0)))
// Boolean = true