Flink会话窗口,最终获得结果

问题描述 投票:0回答:1

我有一个kafka消息,类似于以下模式:

{ user: 'someUser', value: 'SomeValue' , timestamp:000000000}

使用Flink流计算对这些项目进行计数操作。

现在我要声明一个会话,以单个时间戳在X秒的范围内收集相同的用户+值,并带有最新的时间戳,然后仅将其转发到下一个流中一次]]

所以我写了这样的东西:

data.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Data>() {
        .....
    })
    .keyBy(new KeySelector<Data, String>(){

        .......
    })
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .aggregate(new AggregateFunction<Data, Data, Data>() {

        @Override
        public Data createAccumulator() {
            return null;
        }

        @Override
        public Data add(Data value, Data accumulator) {
            if(accumulator == null) {
                accumulator = value;
            }
            return accumulator;

        }

        @Override
        public Data getResult(Data accumulator) {
            return accumulator;
        }

        @Override
        public Data merge(Data a, Data b) {
            return a;
        }
   });

但是问题是getResult函数在每个元素上调用,而不仅仅是在窗口的末尾。

我的问题是如何在窗口结束之前不将聚合结果转发到下一个流。据我所知,即使没有结束窗口,当没有更多元素时,流程流的结果也在向前发展[]

任何建议?

谢谢

[我有一个类似以下模式的kafka消息:{用户:'someUser',值:'SomeValue',timestamp:000000000}使用Flink流计算,可以对这些项目进行人员计数。 ...

apache-flink flink-streaming
1个回答
0
投票

Flink提供了两种不同的评估窗口的方法。在这种情况下,您想使用另一个。

一种方法逐步评估每个窗口的内容。这是通过reduceaggregate获得的。将元素分配给窗口时,将调用ReduceFunctionAggregateFunction,并且该元素立即对最终结果做出贡献。

© www.soinside.com 2019 - 2024. All rights reserved.