需要输出,其中第一列应包含每行的(column1,column2)中最低的字符串。
我收到错误:线程“ main” org.apache.spark.SparkException中的异常:任务无法序列化
我正在尝试使用UDF返回两个字符串的最小值和最大值,并在sql语句中使用它。
“从朋友中选择minUDF(name1,name2),maxUDF(name1,name2)”
不知道我在哪里做错了。有人可以帮我发现我的错误吗?
Input : Required Output :
+-----+-----+ +-----+-----+
|name1|name2| |name1|name2|
+-----+-----+ +-----+-----+
| shir| amit| | amit| shir|
| bane| shir| | bane| shir|
| shir| raj | | raj| shir|
| amit| shir| | amit| shir|
| xiag| alan| | alan| xiag|
| shir| raj| | raj| shir|
+-----+-----+ +-----+-----+
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
object test {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate();
val sc=spark.sparkContext
case class friends(name1:String,name2:String)
val rdd=sc.parallelize(Seq(Row("shir","amit"),Row("amit","shir"),Row("raj","shir"),Row("amit","shir"),Row("raj","shir"),Row("shir","raj")))
val schema=StructType(Array(
StructField("name1",StringType,true),
StructField("name2",StringType,true)
))
val df=spark.createDataFrame(rdd, schema)
df.show()
val minfun = (str1:String,str2:String)=>{
if(str1.compareTo(str2)<0)
return str1
else
return str2
}
val maxfun = (str1:String,str2:String)=>{
if(str1.compareTo(str2)>0)
return str1
else
return str2
}
spark.udf.register("minUDF",minfun)
spark.udf.register("maxUDF",maxfun)
df.createOrReplaceTempView("friends")
val ddd = spark.sql("select minUDF(name1,name2), maxUDF(name1,name2) from friends")
ddd.show
spark.stop()
}
}```
[当我们具有相同的内置函数时,请不要编写UDF:
尝试使用SQL的最小和最大函数来计算最小和最大:
import org.apache.spark.sql.functions.least
import org.apache.spark.sql.functions.greatest
val ddd = spark.sql("select least(name1,name2), greatest(name1,name2) from friends")
ddd.show
也要避免使用UDF,因为它们需要在所有辅助节点之间进行序列化,这将导致您承担序列化和反序列化的费用。