从 scala 中的 CSV 文件加载时,我需要从数据框中跳过三行

问题描述 投票:0回答:4

我正在将 CSV 文件加载到数据框中,我可以做到这一点,但我需要跳过文件中的前三行。

我尝试了

.option()
命令,将 header 设置为 true,但它忽略了唯一的第一行。

val df = spark.sqlContext.read
    .schema(Myschema)
    .option("header",true)
    .option("delimiter", "|")
    .csv(path)

我想将标题设置为 3 行,但我找不到方法。

替代想法:从数据框中跳过这 3 行

请帮我解决这个问题。预先感谢。

scala apache-spark bigdata
4个回答
1
投票

处理问题的通用方法是对数据帧建立索引并过滤大于 2 的索引。

直接的方法:

正如另一个答案中所建议的,您可以尝试使用

monotonically_increasing_id
添加索引。

df.withColumn("Index",monotonically_increasing_id)
  .filter('Index > 2)
  .drop("Index")

但是,只有前 3 行位于第一个分区中时,这才有效。此外,正如评论中提到的,今天的情况就是这样,但是这段代码可能会随着进一步的版本或火花而完全破坏,这将很难调试。事实上,API 中的约定只是“生成的 ID 保证单调递增且唯一,但不连续”。因此,假设它们总是从零开始是不太明智的。在当前版本中甚至可能有其他情况不起作用(但我不确定)。

为了说明我的第一个担忧,请看一下:

scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()
+---+----------+
| id|     Index|
+---+----------+
|  0|         0|
|  1|         1|
|  2|8589934592|
|  3|8589934593|
+---+----------+

我们只会删除两行...

安全方法:

之前的方法在大多数情况下都有效,但为了安全起见,您可以使用 RDD API 中的

zipWithIndex
来获取连续索引。

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
  val rdd = df.rdd.zipWithIndex
    .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
  val newSchema = df.schema
    .add(StructField(name, LongType, false))
  df.sparkSession.createDataFrame(rdd, newSchema)
}
zipWithIndex(df, "index").where('index > 2).drop("index")

我们可以检查它是否更安全:

scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()
+---+-----+
| id|index|
+---+-----+
|  0|    0|
|  1|    1|
|  2|    2|
|  3|    3|
+---+-----+

0
投票

df1 = spark.read.format("csv")\ .option("header","false")\ .option("skiprows","1")\ .schema(my_schema)\ .load("/FileStore/tables/2010_summary.csv") df1.display()


-1
投票

您可以尝试这个选项

df.withColumn("Index",monotonically_increasing_id())
        .filter(col("Index") > 2)
        .drop("Index")

-1
投票

您可以尝试将 wrt 更改为您的架构。

 import org.apache.spark.sql.Row
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  //Read CSV
  val file = sc.textFile("csvfilelocation")

  //Remove first 3 lines
  val data = file.mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(3) else iter }

  //Create RowRDD by mapping each line to the required fields 
  val rowRdd = data.map(x=>Row(x(0), x(1)))

  //create dataframe by calling sqlcontext.createDataframe with rowRdd and your schema   
  val df = sqlContext.createDataFrame(rowRdd, schema)
© www.soinside.com 2019 - 2024. All rights reserved.