向Spark数据框添加其他列

问题描述 投票:0回答:1

我使用文件路径解析Spark数据帧,但现在我也想将路径和时间一起添加到结果数据帧作为单独的列。这是当前的解决方案(pathToDF是辅助方法):

val paths = pathsDF
  .orderBy($"time")
  .select($"path")
  .as[String]
  .collect()

if(paths.nonEmpty) {
  paths
    .grouped(groupsNum.getOrElse(paths.length))
    .map(_.map(pathToDF).reduceLeft(_ union _))
} else {
  Seq.empty[DataFrame]
}

我正在尝试执行类似的操作,但是我不确定如何使用withColumn添加时间列:

    val orderedPaths = pathsDF
      .orderBy($"time")
      .select($"path")
   //.select($"path", $"time") for both columns

    val paths = orderedPaths
      .as[String]
      .collect()

    if (paths.nonEmpty) {
      paths
        .grouped(groupsNum.getOrElse(paths.length))
        .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
          .withColumn("path", orderedPaths("path")))
    //.withColumn("time", orderedPaths("time") something like this
    } else {
      Seq.empty[DataFrame]
    }

什么是实现它的更好方法?

输入DF:

time Long
path String

当前结果:

resultDF schema
field1 Int
field2 String
....
fieldN String

预期结果:

resultDF schema
field1 Int
field2 String
....
path   String
time   Long
scala apache-spark apache-spark-sql
1个回答
0
投票

请检查下面的代码。

1。grouped更改为par功能以并行数据加载。

2。更改

// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
     .map(group => group.map(pathToDataDF).reduceLeft(_ union _)
     .withColumn("path", orderedPaths("path"))) 

// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
    group.map(path => {
        pathToDataDF(path).withColumn("path", lit(path)) 
        }
    )
})
.reduceLeft(_ union _)

例如,我同时使用pargrouped来显示给您。

[Note忽略某些我尝试复制您的方法的方法,例如pathToDataDF

scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]

scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame

//Sample File content I have taken.

scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}

scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}

scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}

使用par

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val parDF = paths match {
        case p if !p.isEmpty => {
            p.par
            .map(path => { 
                pathToDataDF(path)
                .withColumn("path",lit(path))
            }).reduceLeft(_ union _)
        }
        case _ => spark.emptyDataFrame
    }
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> parDF.show(false)
+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// With time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val parDF = paths match {
            case p if !p.isEmpty => {
                p.par
                .map(path => {
                    pathToDataDF(path._1)
                    .withColumn("path",lit(path._1))
                    .withColumn("time",lit(path._2))
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

使用grouped

scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => { 
                        pathToDataDF(path)
                        .withColumn("path", lit(path))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }

groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> groupedDF.show(false)

+---+--------+------------------------------+
|age|name    |path                          |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi    |/tmp/data/folderb/folderb.json|
|25 |Raju    |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+

// with time column.

scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))

scala> val groupedDF = paths match {
            case p if !p.isEmpty => {
                paths
                .grouped(groupsNum.getOrElse(paths.length))
                .flatMap(group => {
                    group
                    .map(path => {
                        pathToDataDF(path._1)
                        .withColumn("path",lit(path._1))
                        .withColumn("time",lit(path._2))
                    })
                }).reduceLeft(_ union _)
            }
            case _ => spark.emptyDataFrame
        }


groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]

scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name    |path                          |time               |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi    |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju    |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+

© www.soinside.com 2019 - 2024. All rights reserved.