Apache Flink 中的滑动窗口在窗口完成之前发出结果

问题描述 投票:0回答:1

我正在尝试使用 Java 中的 Apache Flink 实现滑动窗口。我一直参考以下文件:

Windows|阿帕奇 Flink

根据我的用例,我收到来自 Kafka 的传入数据流。该流本质上包含从用户设备捕获的用户速度。我的任务是计算出用户在过去10分钟内的平均速度,并且需要每分钟计算一次。因此,我使用大小为 10 分钟的滑动窗口,滑动间隔为 1 分钟。由于我处于开发的初始阶段,我只是想在控制台上打印结果。

假设我在 11:00 开始我的 Flink 工作。我的第一个输出应该在 11:10 产生,然后在 11:11、11:12 等输出。

但是第一个输出是在 11:01 打印在控制台上的,随后每分钟输出一次。

下面是我的代码:

package org.example;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class DEStationarySample {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableForceAvro();
        env.setParallelism(2);




        // Creating AVRO Schema for Rider Location Events
        final Schema RiderLocationSchema = new org.apache.avro.Schema.Parser().parse("avro_schema");


        // Creating Rider Location Events Stream
        KafkaSource<GenericRecord> riderLocationEventsSource = KafkaSource.<GenericRecord>builder()
                .setBootstrapServers("bootstrapServers")
                .setGroupId("consumer_group_id")
                .setTopics("kafka_topic")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(RiderLocationSchema, "schema_registry_url")
                )
                .build();


        DataStream<GenericRecord> riderLocationEventsStream = env.fromSource(
                riderLocationEventsSource,
                WatermarkStrategy.<GenericRecord>forBoundedOutOfOrderness(Duration.ofSeconds(60))
                        .withTimestampAssigner((genericRecord, timestamp) -> (long)genericRecord.get("eventTime")),
                "Kafka Source"
        );

        DataStream<Map<String, Object>> outputResult = riderLocationEventsStream.keyBy(data-> data.get("tripId"))
                .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
                .aggregate(new AggregateFunction<GenericRecord, Map<String, Object>, Map<String, Object>>() {
                    @Override
                    public Map<String, Object> createAccumulator() {
                        Map<String, Object> m = new HashMap<>();
                        m.put("tripId",null);
                        m.put("firstEventTime",0L);
                        m.put("lastEventTime",0L);
                        m.put("speed",0.0);
                        m.put("count",0);

                        return m;
                    }

                    @Override
                    public Map<String, Object> add(GenericRecord genericRecord, Map<String, Object> accumulator) {

                        long firstEventTime = (long)accumulator.get("firstEventTime");
                        if(firstEventTime == 0L){
                            accumulator.put("firstEventTime", genericRecord.get("eventTime"));
                        }
                        accumulator.put("lastEventTime", genericRecord.get("eventTime"));
                        accumulator.put("tripId", genericRecord.get("tripId"));
                        accumulator.put("speed", (double)accumulator.get("speed") + (double)genericRecord.get("speed"));
                        accumulator.put("count", (int)accumulator.get("count") +1);
                        return accumulator;
                    }

                    @Override
                    public Map<String, Object> getResult(Map<String, Object> accumulator) {
                        
                        double speed = (double)accumulator.get("speed");
                        int count = (int)accumulator.get("count");
                        String tripId = accumulator.get("tripId").toString();
                        
                        Map<String, Object> result = new HashMap<>();
                        result.put("tripId",tripId);
                        result.put("averageSpeed",speed/count);
                        return result;
                    }

                    @Override
                    public Map<String, Object> merge(Map<String, Object> stringObjectMap, Map<String, Object> acc1) {
                        return null;
                    }
                });
        outputResult.print();
        env.execute("Sample Processing");
    }
}



输入kafka流如下所示:

{
  "eventName": "RIDER_LOCATION_ACTIVITY",
  "eventType": "RIDER_LOCATION_UPDATED",
  "scmVersion": 1,
  "eventTime": 1706887297150,
  "correlationId": "1b27f143-f22f-4642-bb24-f8c3d18f35c8-652-681",
  "riderId": "999999",
  "lat": 12.876188,
  "lng": 77.60885,
  "battery": 69,
  "network": "LTE",
  "capturedAt": 1706887297977,
  "speed": 0.0027558245,
  "orderId": "",
  "orderStatus": "",
  "tripId": "trip_1234342",
  "employerReferenceId": "EMP12345"
}

保证每个事件都有与之关联的 tripId。

我尝试更改幻灯片间隔。我观察到幻灯片间隔一结束就产生了第一个输出。然而,在达到窗口大小之前仍然产生了结果。我是 Flink 新手。如有任何帮助,我们将不胜感激。

java apache-flink complex-event-processing
1个回答
0
投票

假设我在 11:00 开始我的 Flink 工作。我的第一个输出应该在 11:10 产生,然后在 11:11、11:12 等输出。

事实并非如此,因为您的水印分配器基于 eventTime,而不是处理时间。这也是你想要的,否则你每次开始工作都会得到不同的结果。

因此 eventTime 和水印由正在读取的第一个 eventTime 确定。如果是

1706887297150
,则等于 UTC
15:21:37.150
,它决定了窗口时间的开始。

© www.soinside.com 2019 - 2024. All rights reserved.