传递结构到UDAF火花

问题描述 投票:1回答:1

我有以下的模式 -

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)
 |-- name: string (nullable = true)

我怎样才能通过结构“车”到UDAF?应该是什么inputSchema,如果我只是想通过汽车子结构。

scala apache-spark hadoop apache-spark-sql user-defined-functions
1个回答
2
投票

你可以,但UDAF的逻辑是不同的。举例来说,如果你有两行:

val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))

val rdd = spark.sparkContext.parallelize(seq)

这里的模式是

root
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)

那么,如果你尝试调用聚集:

val df = seq.toDF
df.agg(agg0(col("cars")))

你必须改变你的UDAFs输入模式,如:

val carsSchema =
    StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))

并在UDAF的男孩,你必须处理这种模式变更的inputSchema:

override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)

在您的更新方法,你必须面对你的输入行的格式:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  val i = input.getAs[Array[Array[String]]](0)
  // i here would be [car1,car2,car3],  an array of strings
  buffer(0) = ???
}

从一个在这里,你可以将我更新您的缓冲区,并完成合并,并评估功能。

© www.soinside.com 2019 - 2024. All rights reserved.