我有一个kafka消息,类似于以下模式:
{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}
使用Flink流计算对这些项目进行计数操作。
现在我要声明一个会话,以单个时间戳在X秒的范围内收集相同的用户+值,并带有最新的时间戳,然后仅将其转发到下一个流中一次]]
所以我写了这样的东西:
data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() { ..... }) .keyBy(new KeySelector<Data, String>(){ ....... }) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .aggregate(new AggregateFunction<Data, Data, Data>() { @Override public Data createAccumulator() { return null; } @Override public Data add(Data value, Data accumulator) { if(accumulator == null) { accumulator = value; } return accumulator; } @Override public Data getResult(Data accumulator) { return accumulator; } @Override public Data merge(Data a, Data b) { return a; } });
但是问题是getResult函数在每个元素上调用,而不仅仅是在窗口的末尾。
我的问题是如何在窗口结束之前不将聚合结果转发到下一个流。据我所知,即使没有结束窗口,当没有更多元素时,流程流的结果也在向前发展[]
任何建议?
谢谢
[我有一个类似以下模式的kafka消息:{用户:'someUser',值:'SomeValue',timestamp:000000000}使用Flink流计算,可以对这些项目进行人员计数。 ...
Flink提供了两种不同的评估窗口的方法。在这种情况下,您想使用另一个。
一种方法逐步评估每个窗口的内容。这是通过reduce
和aggregate
获得的。将元素分配给窗口时,将调用ReduceFunction
或AggregateFunction
,并且该元素立即对最终结果做出贡献。