我了解到,任何流数据的窗口化都使用水印作为边界
在以下answer的代码中,生成的流中没有注入水印(
fromElements()
)
case class Record( key: String, value: Int )
object Job extends App
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
val step2 = data.map( r => Record( r.key, r.value * 2 ) )
val step3 = data.map( r => Record( r.key, r.value * 3 ) )
val merged = step1.union( step2, step3 )
val keyed = merged.keyBy(0)
val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
val summed = triggered.sum( 1 )
summed.print()
env.execute("test")
}
在下面的代码中,
kafka事件已打水印:
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
问题: 什么时候注入水印?什么时候不可以
事件时间窗口总是需要水印。
在您用
fromElements
提到的情况下,看起来没有水印,但实际上有。每当有界源(例如 fromElements
)到达其输入末尾时,它都会生成一个值为 MAX_WATERMARK 的最终水印。这会导致所有待处理的计时器被触发,从而关闭所有打开的窗口。
这样做是为了提供批处理作业所需的语义,即它们在消耗完所有输入后生成结果。