Global Window自定义触发器上的allowedLateness

问题描述 投票:1回答:1

我为事件流创建了自定义触发器和处理功能。

DataStream<DynamoDBRow> dynamoDBRows =
    sensorEvents
        .keyBy("id")
        .window(GlobalWindows.create())
        .trigger(new MyCustomTrigger())
        .allowedLateness(Time.minutes(1)) # Note
        .process(new MyCustomWindowProcessFunction());

我的触发器基于事件参数。接收到事件结束信号后,MyCustomWindowProcessFunction()将应用于窗口元素。

@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {

  @Override
  public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

    if (element.isEventEnd() == true) {
      return TriggerResult.FIRE_AND_PURGE;
    }

    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
    return TriggerResult.CONTINUE;
  }

  @Override
  public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}

可能只有很少的传感器数据,即使在触发之后也可能会出现。因此,我添加了.allowedLateness(Time.minutes(1)),以确保在处理时不会错过这些事件。

就我而言,allowedLateness不起作用。

浏览文档后,我发现了这个

allowedLateness is not applicable for Global Window

如何在GlobalWindow中包含allowedLateness

:我也尝试设置环境时间特性

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
java apache-flink amazon-kinesis amazon-kinesis-analytics
1个回答
0
投票

老实说,我看不出在这里使用GlobalWindow的原因。您可以只使用与KeyedProcessFunction的目的相同的Trigger,基本上,它将把从事件开始到事件结束的所有元素收集到ListState中,然后当您收到isEventEnd()==true,您只需安排EventTime计时器,该计时器将在一分钟后触发并发出在ListState内部收集的结果。

© www.soinside.com 2019 - 2024. All rights reserved.