我正在尝试使用 Java 中的 Apache Flink 实现滑动窗口。我一直参考以下文件:
根据我的用例,我收到来自 Kafka 的传入数据流。该流本质上包含从用户设备捕获的用户速度。我的任务是计算出用户在过去10分钟内的平均速度,并且需要每分钟计算一次。因此,我使用大小为 10 分钟的滑动窗口,滑动间隔为 1 分钟。由于我处于开发的初始阶段,我只是想在控制台上打印结果。
假设我在 11:00 开始我的 Flink 工作。我的第一个输出应该在 11:10 产生,然后在 11:11、11:12 等输出。
但是第一个输出是在 11:01 打印在控制台上的,随后每分钟输出一次。
下面是我的代码:
package org.example;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class DEStationarySample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableForceAvro();
env.setParallelism(2);
// Creating AVRO Schema for Rider Location Events
final Schema RiderLocationSchema = new org.apache.avro.Schema.Parser().parse("avro_schema");
// Creating Rider Location Events Stream
KafkaSource<GenericRecord> riderLocationEventsSource = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("bootstrapServers")
.setGroupId("consumer_group_id")
.setTopics("kafka_topic")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(RiderLocationSchema, "schema_registry_url")
)
.build();
DataStream<GenericRecord> riderLocationEventsStream = env.fromSource(
riderLocationEventsSource,
WatermarkStrategy.<GenericRecord>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((genericRecord, timestamp) -> (long)genericRecord.get("eventTime")),
"Kafka Source"
);
DataStream<Map<String, Object>> outputResult = riderLocationEventsStream.keyBy(data-> data.get("tripId"))
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(1)))
.aggregate(new AggregateFunction<GenericRecord, Map<String, Object>, Map<String, Object>>() {
@Override
public Map<String, Object> createAccumulator() {
Map<String, Object> m = new HashMap<>();
m.put("tripId",null);
m.put("firstEventTime",0L);
m.put("lastEventTime",0L);
m.put("speed",0.0);
m.put("count",0);
return m;
}
@Override
public Map<String, Object> add(GenericRecord genericRecord, Map<String, Object> accumulator) {
long firstEventTime = (long)accumulator.get("firstEventTime");
if(firstEventTime == 0L){
accumulator.put("firstEventTime", genericRecord.get("eventTime"));
}
accumulator.put("lastEventTime", genericRecord.get("eventTime"));
accumulator.put("tripId", genericRecord.get("tripId"));
accumulator.put("speed", (double)accumulator.get("speed") + (double)genericRecord.get("speed"));
accumulator.put("count", (int)accumulator.get("count") +1);
return accumulator;
}
@Override
public Map<String, Object> getResult(Map<String, Object> accumulator) {
double speed = (double)accumulator.get("speed");
int count = (int)accumulator.get("count");
String tripId = accumulator.get("tripId").toString();
Map<String, Object> result = new HashMap<>();
result.put("tripId",tripId);
result.put("averageSpeed",speed/count);
return result;
}
@Override
public Map<String, Object> merge(Map<String, Object> stringObjectMap, Map<String, Object> acc1) {
return null;
}
});
outputResult.print();
env.execute("Sample Processing");
}
}
输入kafka流如下所示:
{
"eventName": "RIDER_LOCATION_ACTIVITY",
"eventType": "RIDER_LOCATION_UPDATED",
"scmVersion": 1,
"eventTime": 1706887297150,
"correlationId": "1b27f143-f22f-4642-bb24-f8c3d18f35c8-652-681",
"riderId": "999999",
"lat": 12.876188,
"lng": 77.60885,
"battery": 69,
"network": "LTE",
"capturedAt": 1706887297977,
"speed": 0.0027558245,
"orderId": "",
"orderStatus": "",
"tripId": "trip_1234342",
"employerReferenceId": "EMP12345"
}
保证每个事件都有与之关联的 tripId。
我尝试更改幻灯片间隔。我观察到幻灯片间隔一结束就产生了第一个输出。然而,在达到窗口大小之前仍然产生了结果。我是 Flink 新手。如有任何帮助,我们将不胜感激。
假设我在 11:00 开始我的 Flink 工作。我的第一个输出应该在 11:10 产生,然后在 11:11、11:12 等输出。
事实并非如此,因为您的水印分配器基于 eventTime,而不是处理时间。这也是你想要的,否则你每次开始工作都会得到不同的结果。
因此 eventTime 和水印由正在读取的第一个 eventTime 确定。如果是
1706887297150
,则等于 UTC 15:21:37.150
,它决定了窗口时间的开始。