在Spark-shell中执行少量转换后,我得到了如下输出。我是从REPL复制并粘贴的。
scala> s.collect res44:Array [(String,String)] = Array((45000,Pavan,Amit),(50000,Bhupesh,Tejas,Dinesh)
现在我需要为每个ID生成单独的文件,ID包含在文件名中,如下所示
The file with name ID45000.txt should have below content
45000,Pavan
45000,Amit
The file with name ID50000.txt should have below content.
50000,Bhupesh
50000,Tejas
50000,Dinesh
尝试下面的代码,但没有工作
s.foreach{case(k,v) => flatMapValues(x=>x.split(",")).saveAsTextFile("ID"+k+".txt")}
专家能帮助我吗?
鉴于你的s
rdd
Array[(String, String)] = Array((45000,Pavan,Amit), (50000,Bhupesh,Tejas,Dinesh)
执行以下操作应该有效(尽管不是有效的解决方案)。
val groupedRDD = s.flatMap{case(k, v)=> v.split(",").map(x => (k, x))}.groupByKey()
val keySetToCheck = groupedRDD.collect().toMap.keySet
for(key <- keySetToCheck){
groupedRDD.filter(x => x._1 == key).saveAsTextFile("ID"+key+".txt")
}
我希望答案是有帮助的