使用定义函数Spark 2.4?

问题描述 投票:1回答:1

我正在运行一个kmeans算法,我创建了一个VectorAssembler,将inputcols设置为(“经度”,“纬度”)和outputCol(“位置”)。我需要将我的数据从json文件聚类到3个集群。我按经度和纬度对数据进行分类,然后创建Vector Location来连接两者。位置和纬度是DoubleType。我认为它是因为位置向量我得到以下错误:

19/04/08 15:20:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$1629/684426930: (struct<latitude:double,longitude:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

这是我的代码:

import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.types.{DataTypes, DoubleType, StructType}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.feature
import org.apache.spark.{SparkConf, SparkContext, sql}
import org.apache.spark.ml.feature.{Binarizer, Interaction, VectorAssembler}
import org.apache.spark.sql
import org.apache.spark.sql.expressions.UserDefinedFunction
//plotting




object Clustering_kmeans {

  def main(args: Array[String]): Unit = {

    println("hello world me")

    // Spark Session
   val  sc = SparkSession.builder().appName("Clustering_Kmeans").master("local[*]").getOrCreate()


    import sc.implicits._
    sc.sparkContext.setLogLevel("WARN")
    // Loads data.
    val stations = sc.sqlContext.read.option("multiline",true).json("/home/aymenstien/Téléchargements/Brisbane_CityBike.json")
// trans
    val st = stations.withColumn("longitude", $"longitude".cast(sql.types.DoubleType))
      .withColumn("latitude", $"latitude".cast(sql.types.DoubleType)).cache()


    val stationVA = new VectorAssembler().setInputCols(Array("latitude","longitude")).setOutputCol("location")
    val stationWithLoc =stationVA.transform(st)


//print

    stationWithLoc.show(truncate = false)
    stationWithLoc.printSchema()
    //val x = st.select('longitude).as[Double].collect()
    //val y = st.select('latitude).as[Double].collect()

//st.printSchema()

  }

}

这是Schema

root
 |-- address: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- id: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)
 |-- position: string (nullable = true)
 |-- location: vector (nullable = true)

scala apache-spark apache-spark-ml scala-2.12
1个回答
0
投票

由于你有所有功能的nullable = true,如果你有任何空值,VectorAssembler将抛出一个错误。尝试将handleInvalid设置为"skip"。这将过滤掉任何空值的行。

val stationVA = new VectorAssembler().
                     setInputCols(Array("latitude","longitude")).
                     setOutputCol("location").
                     setHandleInvalid("skip")

© www.soinside.com 2019 - 2024. All rights reserved.