等价于withWatermark火花流的SQL查询。

问题描述 投票:0回答:0

我有一个方法,它需要spark sql查询作为参数,在流数据集上运行,我必须处理窗口函数和withWatermark。窗口函数似乎是可能的,但我不能找到等价的sql语法与watermark。

  val levels = spark.sparkContext.parallelize(Seq(
  // (year, month, dayOfMonth, hour, minute, second)
  ((2012, 12, 12, 12, 12, 12), 5),
  ((2012, 12, 12, 12, 12, 14), 9),
  ((2012, 12, 12, 13, 13, 14), 4),
  ((2016, 8, 13, 0, 0, 0), 10),
  ((2017, 5, 27, 0, 0, 0), 15))).
  map { case ((yy, mm, dd, h, m, s), a) => (LocalDateTime.of(yy, mm, dd, h, m, s), a) }.
  map { case (ts, a) => (Timestamp.valueOf(ts), a) }.
  toDF("time", "level")

  levels.createGlobalTempView("test")

  val apiResult = levels.select(window($"time", "5 seconds","2 seconds"), $"level")
  //same window function using sql 
  val sqlResult=  spark.sql("select window(time, '5 seconds','2 seconds'), level from  global_temp.test");

  apiResult.show(false)
  sqlResult.show(false) // both apiResult, sqlResult produces same result

  //need equivalent sql for below watermark-window function 
  val windowedCountsApi = levels
  .withWatermark("time", "10 seconds")
  .groupBy(window($"time", "5 minutes", "2 minutes"), $"level")
  .count()

  windowedCountsApi.show(truncate = false)

为了简单起见,我已经用普通数据集进行了测试,非常感谢您的帮助。

apache-spark apache-spark-sql spark-streaming
© www.soinside.com 2019 - 2024. All rights reserved.