我有一个用例,其中我必须使用WholeTextFiles读取文件。但是,我需要在文件中产生行号。如果我使用:
val file=sc.wholeTextFiles("path").zipWithIndex
每个文件我得到一个行号。如何获得每个文件每行的行号?
一种简单的方法是使用flatMap
来平坦化加载的RDD,该函数为每个文本文件逐行添加行号,如下所示:
import org.apache.spark.sql.Row
val rdd = sc.wholeTextFiles("/path/to/textfiles").
flatMap{ case (fName, lines) =>
lines.split("\\n").zipWithIndex.map{ case (line, idx) => (fName, idx, line) }
}
// rdd: org.apache.spark.rdd.RDD[(String, Int, String)] = ...
Collect
-RDD应该导致如下所示:
rdd.collect
// res1: Array[(String, Int, String)] = Array(
// ("/path/to/file1", 0, "text line 1 in file1"),
// ("/path/to/file1", 1, "text line 2 in file1"),
// ("/path/to/file1", 2, "text line 3 in file1"),
// ...
// ("/path/to/file2", 0, "text line 1 in file2"),
// ("/path/to/file2", 1, "text line 2 in file2"),
// ...
// ...
// )