我正在尝试使用Scala在Databricks中实现UDF功能。即使在将函数封装在类中并继承Serializable类后,也出现Get Task不可序列化错误。请参考以下代码:
var rkList = List[String]("")
class appendData extends Serializable{
var cKey = ""
def addKey(data:String):String={
if(data=="")
{
return cKey
}
else
{
cKey=data
return cKey
}
}
def execute(dframe: DataFrame): DataFrame ={
val keyAddUDF = udf[String, String](addKey)
var df = dframe.withColumn("r_c",substring(col("val"),0,6))
df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")
return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
}
}
df = (new appendData).execute(df)
您不应该将execute
方法和udf方法都放在同一类中。分别定义addKey
功能,例如:
def addKey(data:String): String = {
var rkList = List[String]("")
var cKey = ""
if(data=="") {
return cKey
} else {
cKey=data
return cKey
}
}
val keyAddUDF = udf[String, String](addKey)
def transformDf(dframe: DataFrame): DataFrame ={
var df = dframe.withColumn("r_c",substring(col("val"),0,6))
df = df.withColumn("r_k",when(col("r_c")===kHolder, substring(col("val"),pos,len)).otherwise(""))
rkList = df.select(col("r_k")).distinct.collect.map(_(0).toString).toList.filter(_ != "")
return df.withColumn("val",concat(col("val"),keyAddUDF(col("r_k")))).drop("r_k","r_c")
}
df = transformDf(df)