Flink 中的自定义窗口函数

问题描述 投票:0回答:1

我有一个流媒体用例,我想在 Flink 中实现一个自定义窗口函数,其中窗口启动是基于包含特定值的事件发生的。然后,窗口将运行 15 秒,收集 15 秒内进入的所有事件并关闭窗口。我想聚合之后的所有窗口事件。 我检查了 Flink 窗口函数,似乎没有一个能解决我的用例。

outputStream = ewSubStream.keyBy(new KeySelector())
    .window(new SessionWindowAssigner(15000))
    .trigger(new SessionWindowTrigger())
    .aggregate(new SessionModelAggregator())
    .map(new SessionModelEvaluator());

我尝试了上面的方法,其中每个窗口、触发器和聚合函数都是自定义的。似乎不起作用。 我也尝试过这个方法:

outputStream = ewSubStream.keyBy(new KeySelector())
    .window(TumblingEventTimeWindows.of(Time.seconds(15))) // tried with .window(EventTimeSessionWindows.withGap(Time.seconds(15))) as well
    .trigger(new SessionWindowTrigger())
    .aggregate(new SessionModelAggregator())
    .map(new SessionModelEvaluator());

我没有看到任何异常,但触发函数中没有打印任何日志。 我的触发函数如下所示:

@Override
public TriggerResult onElement(Element element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
  Map<String, Object> data = element.getData();
  log.info("elementtrigger: {}", element);
  if (data.containsKey("key") &&
      data.get("my_record_key").toString().equals("active")) {
    log.info("firing: {}", element);
    return TriggerResult.FIRE;
  }
  return TriggerResult.CONTINUE;
}

基本上,我想在 my_record_key=active 时触发/启动窗口并运行窗口 15 秒并停止此窗口会话。

当我运行此应用程序时,它不会触发我的窗口函数,也不会运行

trigger
函数。

任何想法/想法都有帮助!

apache-flink flink-streaming
1个回答
0
投票

我只会使用

KeyedProcessFunction
,在其中注册一个计时器,并在使用触发数据收集开始的“特殊事件”调用
KeyedProcessFunction.processElement()
方法后创建状态。您的状态将是您聚合结果所需的任何状态。

KeyedProcessFunction.processElement()
被调用时,并且你有状态(这意味着你已经看到了特殊事件),那么你就可以进行聚合。

当调用

KeyedProcessFunction.onTimer()
时,输出聚合结果,并清除状态。

© www.soinside.com 2019 - 2024. All rights reserved.