无法在Databricks上的Scala中序列化的任务

问题描述 投票:0回答:1

我正在尝试使用Scala在Databricks中实现UDF功能。即使在将函数封装在类中并继承Serializable类后,也出现Get Task不可序列化错误。请参考以下代码:

var rkList = List[String]("")

class appendData extends Serializable{
  var cKey = ""

  def addKey(data:String):String={
    if(data=="")
    {
      return cKey
    }
    else
    {
      cKey=data
      return cKey
    }
  }

  def execute(dframe: DataFrame): DataFrame ={
    val keyAddUDF = udf[String, String](addKey)

    var df = dframe.withColumn("r_c",substring(col("val"),0,6))
    df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
    rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")

    return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
  }
}


df = (new appendData).execute(df)

scala apache-spark databricks azure-databricks serializable
1个回答
0
投票

您不应该将execute方法和udf方法都放在同一类中。分别定义addKey功能,例如:


def addKey(data:String): String = {
    var rkList = List[String]("")
    var cKey = ""

    if(data=="") {
      return cKey
    } else {
      cKey=data
      return cKey
    }
}

val keyAddUDF = udf[String, String](addKey)


def transformDf(dframe: DataFrame): DataFrame ={
    var df = dframe.withColumn("r_c",substring(col("val"),0,6))
    df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
    rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")
    return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
}  


df = transformDf(df)


© www.soinside.com 2019 - 2024. All rights reserved.