Flink - 何时不注入水印?用于开窗

问题描述 投票:0回答:1

我了解到,任何流数据的窗口化都使用水印作为边界


在以下answer的代码中,生成的流中没有注入水印(

fromElements()
)

case class Record( key: String, value: Int )

object Job extends App
{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
  val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  val merged = step1.union( step2, step3 )
  val keyed = merged.keyBy(0)
  val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
  val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
  val summed = triggered.sum( 1 )
  summed.print()
  env.execute("test")
}


在下面的代码中,

kafka事件已打水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

问题: 什么时候注入水印?什么时候不可以

java scala apache-flink flink-streaming
1个回答
0
投票

事件时间窗口总是需要水印。

在您用

fromElements
提到的情况下,看起来没有水印,但实际上有。每当有界源(例如
fromElements
)到达其输入末尾时,它都会生成一个值为 MAX_WATERMARK 的最终水印。这会导致所有待处理的计时器被触发,从而关闭所有打开的窗口。

这样做是为了提供批处理作业所需的语义,即它们在消耗完所有输入后生成结果。

© www.soinside.com 2019 - 2024. All rights reserved.