Flink 应用程序未接收和处理 Kinesis 连接器关闭时生成的事件

问题描述 投票:0回答:3

问题:Flink 应用程序无法接收和处理 Kinesis 连接器关闭时(由于重新启动)生成的事件

我们有以下 Flink 环境设置

env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause); 
env.getCheckpointConfig().setCheckpointTimeout(timeOut); 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Kinesis 具有以下初始配置

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "LATEST");

有趣的是,当我更改 Kinesis 配置以回复事件时,即

 kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
                "TRIM_HORIZON");

Flink 正在从 Kinesis 接收所有缓冲记录(这包括 Flink 应用程序关闭之前、期间和之后生成的事件)并进行处理。因此,这种行为违反了 Flink 应用程序的“Exactly Once”属性。

有人可以指出我缺少的一些明显的东西吗?

apache-flink flink-streaming amazon-kinesis
3个回答
2
投票

Flink Kinesis 连接器确实将分片序列号存储在状态中以进行一次性处理。

从您的描述来看,似乎在您的工作“重新启动”时,检查点状态不受尊重。

首先要消除显而易见的问题: 重启后你的工作恢复得怎么样了? 您是从保存点恢复,还是从上一个检查点自动重新启动?


0
投票

如果您想使用检查点来跟踪消费者在流中的弹出位置,那么前面的答案是一个不错的选择。

这是一种具有更多控制能力的替代方案。您可以尝试在 Flink Kinesis Connector 中使用 AT_TIMESTAMP 作为 STREAM_INITIAL_POSITION 配置选项。

此设置需要一个配置选项 STREAM_INITIAL_TIMESTAMP,这是您需要从 Kinesis 读取消息的时间戳。

可以通过多种方式维护时间戳值 - 用于更新文本文件的接收器、用于在外部数据库(如 DynamoDB)中更新的接收器、由启动脚本手动提供等。

当 Flink 应用程序重新启动时,提供上次处理的时间戳作为运行时参数,并在 Kinesis 使用者的配置中使用它。

您的配置将如下所示:

Properties consumerConfig = new Properties();
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
Double startTimeStamp = 1459799926.480; //Parameterize this!
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startTimeStamp);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

0
投票

根据您的配置,我在哪里可以从 kinesis 获取 startTimeStamp,谢谢!

© www.soinside.com 2019 - 2024. All rights reserved.