如何关闭空闲流的flink事件时间窗口?

问题描述 投票:0回答:1

当前使用 Flink 事件时间,但有时流会空闲,在此期间我希望窗口关闭,以便数据得到输出,而不是等待另一个事件通过。我可以在流空闲时添加定期水印吗?我尝试使用AssignerWithPeriodicWatermarks,但现在已弃用,当前发出周期性水印的方法是什么?该文档是垃圾,无法让我了解如何实现它。这是我当前设置的来源:

```val SomeStream = env
  .addSource(new FlinkKafkaConsumer011[String](Pattern.compile(config.getInputFaultStream), new SimpleStringSchema(), properties))
  .map(x => JsonUtil.fromJson[SomeInput](x) match {
    case Success(value) => value
    case Failure(f) => null
  })
  .filter(x => x != null && x.timestamp != null && x.faultStatus != null && fixDate.isInstant(x.timestamp))
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness(Duration.ofSeconds(60))
      .withIdleness(Duration.ofSeconds(300))
      .withTimestampAssigner(new SerializableTimestampAssigner[SomeInput] {
        override def extractTimestamp(element: SomeInput, recordTimestamp: Long): Long = fixDate.makeInstant(element.timestamp).toEpochMilli
      }))
  .name("ReadFaultEventInput")```
window aggregate apache-flink watermark eventtrigger
1个回答
0
投票

这种情况(所有分区都完全空闲)可能很难处理。您使用的

withIdleness
方法仅在至少一个分区上仍然有消息时才有效。

如果您可以安排保持活动消息,这可能是在缺乏真实事件流量的情况下保持水印前进的好方法。

否则,您可以实施水印策略,该策略使用处理时间计时器来推进水印,尽管缺少事件。这可能有点冒险,因为它无法区分中断和安静期。还有一些风险是,当活动恢复时,有些活动会迟到,因为它们落后于人为的高级水印。

这里是如何使用旧水印界面执行此操作的示例。恐怕我没有一个示例来展示如何使用较新的 WatermarkStrategy 界面执行此操作。也许理解其工作原理的最佳方法是阅读代码。您可以从https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/BoundedOutOfOrdernessWatermarks.java开始。

© www.soinside.com 2019 - 2024. All rights reserved.