有什么方法可以在Java API中从一个特定的偏移量开始消耗kafka主题?

问题描述 投票:0回答:1

我正在使用Kafka Stream API。当我启动我的应用程序时,有时会有一个间隙,我想从一个特定的偏移开始消耗。最早或最新不是我想要的。

streamProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

我想找的是这样一个场景,比如我在配置文件中设置偏移数或以毫秒为单位的日期,然后从那个点开始消耗。我想知道是否有办法实现这个目标?

java apache-kafka kafka-consumer-api apache-kafka-streams
1个回答
2
投票

配置 auto.offset.reset 只对第一次启动应用程序时尚未提交偏移量有效。如果偏移被提交,应用程序将始终从已提交的偏移开始恢复处理。

在Kafka流中,没有API可以显式地设置启动偏移量。消费者API将允许通过 Consumer#seek().

对于Kafka流,获得你想要的行为的一种方法是,停止应用程序,使用 bin/kafka-consmer.group.sh (或者更好 bin/kafka-streams-application-reset.sh),并提交所需的起始偏移量。如果你在之后启动应用程序,它将接收已提交的偏移量并从那里开始处理。

© www.soinside.com 2019 - 2024. All rights reserved.