这个问题在这里已有答案:
如何在不使用order by
或mapGroupWithState
的情况下为spark structure 2.3.0中的每个组选择最大行?
输入:
id | amount | my_timestamp
-------------------------------------------
1 | 5 | 2018-04-01T01:00:00.000Z
1 | 10 | 2018-04-01T01:10:00.000Z
2 | 20 | 2018-04-01T01:20:00.000Z
2 | 30 | 2018-04-01T01:25:00.000Z
2 | 40 | 2018-04-01T01:30:00.000Z
预期产出:
id | amount | my_timestamp
-------------------------------------------
1 | 10 | 2018-04-01T01:10:00.000Z
2 | 40 | 2018-04-01T01:30:00.000Z
寻找使用像sparkSession.sql("sql query")
或类似于原始sql的原始sql但不像mapGroupWithState
这样的流式解决方案
有多种方法可以解决这个问题。
方法1:
您可以在Spark中使用Window操作
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions.{col, desc, rank}
val filterWindow: WindowSpec = Window.partitionBy("id").orderBy(desc("amount"))
val df = ???
df.withColumn("temp_rank", rank().over(filterWindow))
.filter(col("temp_rank") === 1)
.drop("temp_rank")
这个问题是它不适用于结构化流,因为仅在TIMESTAMP
列上支持窗口化。这适用于批处理作业。
方法2:
根据问题中的指定条件,您可以使用下面的内容。分组在id
上完成,分组内容转换为Seq[A]
。在这里,A
代表Struct
。然后过滤掉该Seq以进行记录。
object StreamingDeDuplication {
case class SubRecord(time: java.sql.Timestamp, amount: Double)
val subSchema: StructType = new StructType().add("time", TimestampType).add("amount", DoubleType)
def deDupe: UserDefinedFunction =
udf((data: Seq[Row]) => data.maxBy(_.getAs[Double]("amount")), subSchema)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("StreamingDeDuplication").getOrCreate()
import spark.implicits._
val records = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
.map(_.split(","))
.withColumn("id", $"value".getItem(0).cast("STRING"))
.withColumn("amount", $"value".getItem(1).cast("DOUBLE"))
.withColumn("time", $"value".getItem(2).cast("TIMESTAMP"))
.drop("value")
val results = records
.withColumn("temp", struct("time", "amount"))
.groupByKey(a => a.getAs[String]("id"))
.agg(collect_list("temp").as[Seq[SubRecord]])
.withColumnRenamed("collect_list(temp)", "temp_agg")
.withColumn("af", deDupe($"temp_agg"))
.withColumn("amount", col("af").getField("amount"))
.withColumn("time", col("af").getField("time"))
.drop("af", "temp_agg")
results
.writeStream
.outputMode(OutputMode.Update())
.option("truncate", "false")
.format("console")
.start().awaitTermination()
}
}