Flink Kinesis Consumer 上的水印策略

问题描述 投票:0回答:1

我正在研究使用 Flink 和 Kinesis 流作为源。我想使用事件时间水印。 计划在 AWS 托管的 Flink (Kinesis Analytics) 平台上运行它。

查看AWS文档和确实Flink文档建议使用FlinkKinesisConsumer。
要在此消费者上启用 EventTime,我发现建议使用自定义

AssignerWithPeriodicWatermarks()
并使用
KinesisConsumer
将其设置在
setPeriodicWatermarkAssigner
上。

但是,我还在 Flink 文档中看到该 API 已弃用,建议使用 WatermarkStrategies。

我的问题:

  • 是否可以在 kinesis 使用者上使用 WatermarkStrategy,或者必须在 DataStream 本身的非源操作之后应用它(flink 文档中不鼓励)?
  • 如果不可能并且必须在非源操作之后使用这是什么意思?为什么会气馁呢?工作负载的性能如何
  • 还是建议继续使用已弃用的API?
  • 或者是否还有其他可以推荐的 kinesis flink 消费者

提前感谢您的任何建议

亚历克西斯

java apache-flink amazon-kinesis amazon-kinesis-analytics
1个回答
0
投票

不可能直接在

FlinkKinesisConsumer
上设置它(除了,正如您所指出的,通过使用已弃用的
AssignerWithPeriodicWatermarks
接口)或
SourceFunction
的任何其他实现,但是您可以在获得水印后立即设置水印数据流。

从评论来看,我认为你已经明白了,但这就是我正在做的。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;

// Set up
var env = StreamExecutionEnvironment.getExecutionEnvironment();
var consumerProperties = ...;
var deserializer = ...;
var kinesisConsumer = new FlinkKinesisConsumer<MyDataType>(kinesisStreamName, deserializer, consumerProperties);

// Pick a watermark strategy.
// Use some timestamp field from your event object.
var strategy = WatermarkStrategy.forMonotonousTimestamps<MyDataType>()
                                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

// Start reading from Kinesis
var dataStream = env.addSource(kinesisConsumer)
                    .assignTimestampsAndWatermarks(strategy);

您必须根据您的用例进行调整。显然将

MyDataType
替换为您的流中实际存在的内容,但还要注意
forMonotonousTimestamps
可能不适合您的用例,而
forBoundedOutOfOrderness
可能效果更好。

© www.soinside.com 2019 - 2024. All rights reserved.