我有以下UDF。我试图通过更新现有路径列,使用此UDF向DF添加新路径。
def udfExample = udf((newPath:String,oldPath:String)=>{
val partitions = scala.collection.mutable.ArrayBuffer[String]()
val elements = oldPath.split("/")
val fileName = elements.last
for(e <- elements){
if(e.split("=").length>1)
partitions+=e
}
if(partitions.isEmpty)
newPath + "/" + fileName
else
newPath + "/" + partitions.mkString("/") + fileName
})
调用UDF时,我不确定如何将参数传递给UDF?由于它仅适用于cols。 UDF如何使用path列中的newPath替换oldPath?我有以下DF:
+-------+--------+-----------------------------------
|col1 | col2| path|
+-------+--------+-----------------------------------
| 200|20200218| hdfs://nabc/d/e/batched-202002180|
| 207|20200218| hdfs://nabc/d/e/batched-202002190|
+-------+--------+-----------------------------------
以下内容似乎无效:
val a = someDF.withColumn("path", when(col("path") =!= lit(""), udfExample(col("path"), col("path"))).otherwise(lit("")))
假设新路径是“ batched-999999999”。如何使用此新值更新路径列?
+-------+--------+-----------------------------------
|col1 | col2| path|
+-------+--------+-----------------------------------
| 200|20200218| hdfs://nabc/d/e/batched-999999999|
| 207|20200218| hdfs://nabc/d/e/batched-202002190|
+-------+--------+-----------------------------------
尝试使用窗口函数代替UDF。
导入spark.implicits ._
导入org.apache.spark.sql.expressions ._
org.apache.spark.sql.functions ._
val windowSpec = Window.orderBy(lit(1))
val a = someDF.withColumn(“ path”,when(col(“ path”)=!= lit(“”),first(col(“ path”))。over(windowSpec)).otherwise(lit(“”)))