我正在研究使用 Flink 和 Kinesis 流作为源。我想使用事件时间水印。 计划在 AWS 托管的 Flink (Kinesis Analytics) 平台上运行它。
查看AWS文档和确实Flink文档建议使用FlinkKinesisConsumer。
要在此消费者上启用 EventTime,我发现建议使用自定义
AssignerWithPeriodicWatermarks()
并使用 KinesisConsumer
将其设置在 setPeriodicWatermarkAssigner
上。
但是,我还在 Flink 文档中看到该 API 已弃用,建议使用 WatermarkStrategies。
我的问题:
提前感谢您的任何建议
亚历克西斯
不可能直接在
FlinkKinesisConsumer
上设置它(除了,正如您所指出的,通过使用已弃用的 AssignerWithPeriodicWatermarks
接口)或 SourceFunction
的任何其他实现,但是您可以在获得水印后立即设置水印数据流。
从评论来看,我认为你已经明白了,但这就是我正在做的。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
// Set up
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var consumerProperties = ...;
var deserializer = ...;
var kinesisConsumer = new FlinkKinesisConsumer<MyDataType>(kinesisStreamName, deserializer, consumerProperties);
// Pick a watermark strategy.
// Use some timestamp field from your event object.
var strategy = WatermarkStrategy.forMonotonousTimestamps<MyDataType>()
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
// Start reading from Kinesis
var dataStream = env.addSource(kinesisConsumer)
.assignTimestampsAndWatermarks(strategy);
您必须根据您的用例进行调整。显然将
MyDataType
替换为您的流中实际存在的内容,但还要注意 forMonotonousTimestamps
可能不适合您的用例,而 forBoundedOutOfOrderness
可能效果更好。