Flink 窗口关闭时间

问题描述 投票:0回答:1

代码如下。根据我的理解,当水印大于窗口结束时间时,窗口将被关闭。但我的测试结果很混乱。

public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> ds = env.socketTextStream("192.168.10.100", 9999);

        SingleOutputStreamOperator<T> map = ds.map(x -> {
            String[] s = x.split(" ");
            return new T(s[0], Integer.parseInt(s[1]));
        });
        WatermarkStrategy<T> wms = WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner((t, l) -> {
                    LocalDateTime now = LocalDateTime.now().minusSeconds(10);
                    Date from = Date.from(now.atZone(ZoneId.systemDefault()).toInstant());
                    return from.getTime();
                });

        map.assignTimestampsAndWatermarks(wms)
                .keyBy(T::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<T, List<T>, String, TimeWindow>() {
                    @Override
                    public void process(String s,
                                        ProcessWindowFunction<T, List<T>, String, TimeWindow>.Context context,
                                        Iterable<T> elements, Collector<List<T>> out) throws Exception {
                        System.out.println(context.window().getStart() + "----" + context.window().getEnd() + "----" + context.window().maxTimestamp());
                        List<T> ts = new ArrayList<>();
                        for (T element : elements) {
                            ts.add(element);
                        }

                        out.collect(ts);
                    }
                }).print();
        env.execute();
    }
}

测试结果

1702991405520 1702991406914 1702991408207 1702991409720 1702991411758 1702991413702 1702991414803 1702991418426 1702991419629 1702991420617 1702991421944 1702991423171 1702991424718 1702991426117 1702991427736 1702991429248 1702991405000----1702991410000----1702991409999

窗口是1702991405000----1702991410000,但是为什么当水位达到1702991429248时窗口会关闭并调用process方法呢?

请帮我理解

java apache-flink
1个回答
0
投票

窗口使用的实际时间戳和水印将源自

LocalDateTime.now().minusSeconds(10)
,而不是来自提供给套接字的输入。

我怀疑1702991429248不是水印的值。

© www.soinside.com 2019 - 2024. All rights reserved.