如何使用Raw Spark SQL为spark structured streaming 2.3.0中的每个组选择最大行? [重复]

问题描述 投票:1回答:1

这个问题在这里已有答案:

如何在不使用order bymapGroupWithState的情况下为spark structure 2.3.0中的每个组选择最大行?

输入:

id | amount     | my_timestamp
-------------------------------------------
1  |      5     |  2018-04-01T01:00:00.000Z
1  |     10     |  2018-04-01T01:10:00.000Z
2  |     20     |  2018-04-01T01:20:00.000Z
2  |     30     |  2018-04-01T01:25:00.000Z
2  |     40     |  2018-04-01T01:30:00.000Z

预期产出:

id | amount     | my_timestamp
-------------------------------------------
1  |     10     |  2018-04-01T01:10:00.000Z
2  |     40     |  2018-04-01T01:30:00.000Z

寻找使用像sparkSession.sql("sql query")或类似于原始sql的原始sql但不像mapGroupWithState这样的流式解决方案

apache-spark
1个回答
1
投票

有多种方法可以解决这个问题。

方法1:

您可以在Spark中使用Window操作

import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{col, desc, rank}

val filterWindow: WindowSpec = Window.partitionBy("id").orderBy(desc("amount"))

val df = ???

df.withColumn("temp_rank", rank().over(filterWindow))
.filter(col("temp_rank") === 1)
.drop("temp_rank")

这个问题是它不适用于结构化流,因为仅在TIMESTAMP列上支持窗口化。这适用于批处理作业。

方法2:

根据问题中的指定条件,您可以使用下面的内容。分组在id上完成,分组内容转换为Seq[A]。在这里,A代表Struct。然后过滤掉该Seq以进行记录。

object StreamingDeDuplication {

  case class SubRecord(time: java.sql.Timestamp, amount: Double)

  val subSchema: StructType = new StructType().add("time", TimestampType).add("amount", DoubleType)

  def deDupe: UserDefinedFunction =
    udf((data: Seq[Row]) => data.maxBy(_.getAs[Double]("amount")), subSchema)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("StreamingDeDuplication").getOrCreate()

    import spark.implicits._
    val records = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
      .as[String]
      .map(_.split(","))
      .withColumn("id", $"value".getItem(0).cast("STRING"))
      .withColumn("amount", $"value".getItem(1).cast("DOUBLE"))
      .withColumn("time", $"value".getItem(2).cast("TIMESTAMP"))
      .drop("value")

    val results = records
      .withColumn("temp", struct("time", "amount"))
      .groupByKey(a => a.getAs[String]("id"))
      .agg(collect_list("temp").as[Seq[SubRecord]])
      .withColumnRenamed("collect_list(temp)", "temp_agg")
      .withColumn("af", deDupe($"temp_agg"))
      .withColumn("amount", col("af").getField("amount"))
      .withColumn("time", col("af").getField("time"))
      .drop("af", "temp_agg")

    results
      .writeStream
      .outputMode(OutputMode.Update())
      .option("truncate", "false")
      .format("console")
      .start().awaitTermination()
  }

}
© www.soinside.com 2019 - 2024. All rights reserved.