通过全文本文件火花获取行号zipWithIndex

问题描述 投票:1回答:1

我有一个用例,其中我必须使用WholeTextFiles读取文件。但是,我需要在文件中产生行号。如果我使用:

val file=sc.wholeTextFiles("path").zipWithIndex

每个文件我得到一个行号。如何获得每个文件每行的行号?

scala apache-spark
1个回答
0
投票

一种简单的方法是使用flatMap来平坦化加载的RDD,该函数为每个文本文件逐行添加行号,如下所示:

import org.apache.spark.sql.Row

val rdd = sc.wholeTextFiles("/path/to/textfiles").
  flatMap{ case (fName, lines) =>
    lines.split("\\n").zipWithIndex.map{ case (line, idx) => (fName, idx, line) }
  }
// rdd: org.apache.spark.rdd.RDD[(String, Int, String)] = ...

Collect -RDD应该导致如下所示:

rdd.collect
// res1: Array[(String, Int, String)] = Array(
//   ("/path/to/file1", 0, "text line 1 in file1"),
//   ("/path/to/file1", 1, "text line 2 in file1"),
//   ("/path/to/file1", 2, "text line 3 in file1"),
//       ...
//   ("/path/to/file2", 0, "text line 1 in file2"),
//   ("/path/to/file2", 1, "text line 2 in file2"),
//       ...
//       ...
// )

推荐问答