SparkSQL-Scala程序:第一列应包含每行(col1,col2)中最低的字符串

问题描述 投票:1回答:1

需要输出,其中第一列应包含每行的(column1,column2)中最低的字符串。

我收到错误:线程“ main” org.apache.spark.SparkException中的异常:任务无法序列化

我正在尝试使用UDF返回两个字符串的最小值和最大值,并在sql语句中使用它。

“从朋友中选择minUDF(name1,name2),maxUDF(name1,name2)”

不知道我在哪里做错了。有人可以帮我发现我的错误吗?

Input :           Required Output :
+-----+-----+     +-----+-----+
|name1|name2|     |name1|name2|
+-----+-----+     +-----+-----+
| shir| amit|     | amit| shir|
| bane| shir|     | bane| shir|
| shir| raj |     |  raj| shir|
| amit| shir|     | amit| shir|
| xiag| alan|     | alan| xiag|
| shir|  raj|     |  raj| shir|
+-----+-----+     +-----+-----+

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object test {

  def main(args: Array[String]): Unit = {


    val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate();
    val sc=spark.sparkContext

    case class friends(name1:String,name2:String)
    val rdd=sc.parallelize(Seq(Row("shir","amit"),Row("amit","shir"),Row("raj","shir"),Row("amit","shir"),Row("raj","shir"),Row("shir","raj")))

    val schema=StructType(Array(
      StructField("name1",StringType,true),
      StructField("name2",StringType,true)
    ))

    val df=spark.createDataFrame(rdd, schema)
    df.show()

    val minfun = (str1:String,str2:String)=>{

      if(str1.compareTo(str2)<0)
        return str1
      else
        return str2

    }

    val maxfun = (str1:String,str2:String)=>{

      if(str1.compareTo(str2)>0)
        return str1
      else
        return str2

    }

    spark.udf.register("minUDF",minfun)
    spark.udf.register("maxUDF",maxfun)

    df.createOrReplaceTempView("friends")
    val ddd = spark.sql("select minUDF(name1,name2), maxUDF(name1,name2) from friends")
    ddd.show

    spark.stop()

  }
}```

scala dataframe apache-spark apache-spark-sql
1个回答
0
投票

[当我们具有相同的内置函数时,请不要编写UDF:

尝试使用SQL的最小和最大函数来计算最小和最大:

import org.apache.spark.sql.functions.least

import org.apache.spark.sql.functions.greatest

val ddd = spark.sql("select least(name1,name2), greatest(name1,name2) from friends")

ddd.show

也要避免使用UDF,因为它们需要在所有辅助节点之间进行序列化,这将导致您承担序列化和反序列化的费用。

© www.soinside.com 2019 - 2024. All rights reserved.