如何使用UDF替换旧路径? [Spark Scala UDF]

问题描述 投票:0回答:1

我有以下UDF。我试图通过更新现有路径列,使用此UDF向DF添加新路径。

 def udfExample = udf((newPath:String,oldPath:String)=>{
    val partitions = scala.collection.mutable.ArrayBuffer[String]()
    val elements = oldPath.split("/")
    val fileName = elements.last
    for(e <- elements){
      if(e.split("=").length>1)
        partitions+=e
    }
    if(partitions.isEmpty)
      newPath + "/" + fileName
    else
      newPath + "/" + partitions.mkString("/") + fileName
  })

调用UDF时,我不确定如何将参数传递给UDF?由于它仅适用于cols。 UDF如何使用path列中的newPath替换oldPath?我有以下DF:

+-------+--------+-----------------------------------
|col1   |    col2|                               path|
+-------+--------+-----------------------------------
|    200|20200218|  hdfs://nabc/d/e/batched-202002180|
|    207|20200218|  hdfs://nabc/d/e/batched-202002190|
+-------+--------+-----------------------------------

以下内容似乎无效:

val a = someDF.withColumn("path", when(col("path") =!= lit(""), udfExample(col("path"), col("path"))).otherwise(lit("")))

假设新路径是“ batched-999999999”。如何使用此新值更新路径列?

+-------+--------+-----------------------------------
|col1   |    col2|                               path|
+-------+--------+-----------------------------------
|    200|20200218|  hdfs://nabc/d/e/batched-999999999|
|    207|20200218|  hdfs://nabc/d/e/batched-202002190|
+-------+--------+-----------------------------------
scala apache-spark apache-spark-sql user-defined-functions
1个回答
0
投票

尝试使用窗口函数代替UDF。

导入spark.implicits ._

导入org.apache.spark.sql.expressions ._

org.apache.spark.sql.functions ._

val windowSpec = Window.orderBy(lit(1))

val a = someDF.withColumn(“ path”,when(col(“ path”)=!= lit(“”),first(col(“ path”))。over(windowSpec)).otherwise(lit(“”)))

© www.soinside.com 2019 - 2024. All rights reserved.