我有一个流媒体用例,我想在 Flink 中实现一个自定义窗口函数,其中窗口启动是基于包含特定值的事件发生的。然后,窗口将运行 15 秒,收集 15 秒内进入的所有事件并关闭窗口。我想聚合之后的所有窗口事件。 我检查了 Flink 窗口函数,似乎没有一个能解决我的用例。
outputStream = ewSubStream.keyBy(new KeySelector())
.window(new SessionWindowAssigner(15000))
.trigger(new SessionWindowTrigger())
.aggregate(new SessionModelAggregator())
.map(new SessionModelEvaluator());
我尝试了上面的方法,其中每个窗口、触发器和聚合函数都是自定义的。似乎不起作用。 我也尝试过这个方法:
outputStream = ewSubStream.keyBy(new KeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(15))) // tried with .window(EventTimeSessionWindows.withGap(Time.seconds(15))) as well
.trigger(new SessionWindowTrigger())
.aggregate(new SessionModelAggregator())
.map(new SessionModelEvaluator());
我没有看到任何异常,但触发函数中没有打印任何日志。 我的触发函数如下所示:
@Override
public TriggerResult onElement(Element element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
Map<String, Object> data = element.getData();
log.info("elementtrigger: {}", element);
if (data.containsKey("key") &&
data.get("my_record_key").toString().equals("active")) {
log.info("firing: {}", element);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
基本上,我想在 my_record_key=active 时触发/启动窗口并运行窗口 15 秒并停止此窗口会话。
当我运行此应用程序时,它不会触发我的窗口函数,也不会运行
trigger
函数。
任何想法/想法都有帮助!
我只会使用
KeyedProcessFunction
,在其中注册一个计时器,并在使用触发数据收集开始的“特殊事件”调用 KeyedProcessFunction.processElement()
方法后创建状态。您的状态将是您聚合结果所需的任何状态。
当
KeyedProcessFunction.processElement()
被调用时,并且你有状态(这意味着你已经看到了特殊事件),那么你就可以进行聚合。
当调用
KeyedProcessFunction.onTimer()
时,输出聚合结果,并清除状态。