我使用文件路径解析Spark数据帧,但现在我也想将路径和时间一起添加到结果数据帧作为单独的列。这是当前的解决方案(pathToDF是辅助方法):
val paths = pathsDF
.orderBy($"time")
.select($"path")
.as[String]
.collect()
if(paths.nonEmpty) {
paths
.grouped(groupsNum.getOrElse(paths.length))
.map(_.map(pathToDF).reduceLeft(_ union _))
} else {
Seq.empty[DataFrame]
}
我正在尝试执行类似的操作,但是我不确定如何使用withColumn添加时间列:
val orderedPaths = pathsDF
.orderBy($"time")
.select($"path")
//.select($"path", $"time") for both columns
val paths = orderedPaths
.as[String]
.collect()
if (paths.nonEmpty) {
paths
.grouped(groupsNum.getOrElse(paths.length))
.map(group => group.map(pathToDataDF).reduceLeft(_ union _)
.withColumn("path", orderedPaths("path")))
//.withColumn("time", orderedPaths("time") something like this
} else {
Seq.empty[DataFrame]
}
什么是实现它的更好方法?
输入DF:
time Long
path String
当前结果:
resultDF schema
field1 Int
field2 String
....
fieldN String
预期结果:
resultDF schema
field1 Int
field2 String
....
path String
time Long
请检查下面的代码。
1。将grouped
更改为par
功能以并行数据加载。
2。更改
// Below code will add same path for multiple files content.
paths.grouped(groupsNum.getOrElse(paths.length))
.map(group => group.map(pathToDataDF).reduceLeft(_ union _)
.withColumn("path", orderedPaths("path")))
到
// Below code will add same path for same file content.
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group.map(path => {
pathToDataDF(path).withColumn("path", lit(path))
}
)
})
.reduceLeft(_ union _)
例如,我同时使用par
和grouped
来显示给您。
[Note
忽略某些我尝试复制您的方法的方法,例如pathToDataDF
。
scala> val orderedPaths = Seq(("/tmp/data/foldera/foldera.json","2020-05-29 01:30:00"),("/tmp/data/folderb/folderb.json","2020-05-29 02:00:00"),("/tmp/data/folderc/folderc.json","2020-05-29 03:00:00")).toDF("path","time")
orderedPaths: org.apache.spark.sql.DataFrame = [path: string, time: string]
scala> def pathToDataDF(path: String) = spark.read.format("json").load(path)
pathToDataDF: (path: String)org.apache.spark.sql.DataFrame
//Sample File content I have taken.
scala> "cat /tmp/data/foldera/foldera.json".!
{"name":"Srinivas","age":29}
scala> "cat /tmp/data/folderb/folderb.json".!
{"name":"Ravi","age":20}
scala> "cat /tmp/data/folderc/folderc.json".!
{"name":"Raju","age":25}
使用par
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path)
.withColumn("path",lit(path))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> parDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// With time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val parDF = paths match {
case p if !p.isEmpty => {
p.par
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path._2))
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
parDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> parDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+
使用grouped
scala> val paths = orderedPaths.orderBy($"time").select($"path").as[String].collect
paths: Array[String] = Array(/tmp/data/foldera/foldera.json, /tmp/data/folderb/folderb.json, /tmp/data/folderc/folderc.json)
scala> val groupedDF = paths match {
case p if !p.isEmpty => {
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group
.map(path => {
pathToDataDF(path)
.withColumn("path", lit(path))
})
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
scala> groupedDF.show(false)
+---+--------+------------------------------+
|age|name |path |
+---+--------+------------------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|
|20 |Ravi |/tmp/data/folderb/folderb.json|
|25 |Raju |/tmp/data/folderc/folderc.json|
+---+--------+------------------------------+
// with time column.
scala> val paths = orderedPaths.orderBy($"time").select($"path",$"time").as[(String,String)].collect
paths: Array[(String, String)] = Array((/tmp/data/foldera/foldera.json,2020-05-29 01:30:00), (/tmp/data/folderb/folderb.json,2020-05-29 02:00:00), (/tmp/data/folderc/folderc.json,2020-05-29 03:00:00))
scala> val groupedDF = paths match {
case p if !p.isEmpty => {
paths
.grouped(groupsNum.getOrElse(paths.length))
.flatMap(group => {
group
.map(path => {
pathToDataDF(path._1)
.withColumn("path",lit(path._1))
.withColumn("time",lit(path._2))
})
}).reduceLeft(_ union _)
}
case _ => spark.emptyDataFrame
}
groupedDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 2 more fields]
scala> groupedDF.show(false)
+---+--------+------------------------------+-------------------+
|age|name |path |time |
+---+--------+------------------------------+-------------------+
|29 |Srinivas|/tmp/data/foldera/foldera.json|2020-05-29 01:30:00|
|20 |Ravi |/tmp/data/folderb/folderb.json|2020-05-29 02:00:00|
|25 |Raju |/tmp/data/folderc/folderc.json|2020-05-29 03:00:00|
+---+--------+------------------------------+-------------------+