在kafka或使用者重启期间是否有任何配置可用于清除未提交的消息?

问题描述 投票:0回答:1

我有一个业务场景,消费者不应使用主题中已提交/未提交的消息当使用者或kafka重新启动时,我尝试应用auto.offset.reset: latest。但是它从主题中提取了未提交的偏移量。拥有一个带有一个带有1个主题和1个分区的实例的应用程序。假设我发布了10条消息,消费者选择了5条消息并提交了偏移量。现在,我重新启动我的消费者实例/ kafka。重新启动后,它不应再选择旧的5条消息尚未提交。正在寻找任何其他配置或解决方法。

spring-boot apache-kafka spring-cloud-stream
1个回答
0
投票
您需要确保您的使用者在每次重新启动应用程序时获得一个新的使用者组(在Java API中,该使用者的配置称为group.id)。即使重新启动代理,仍将使用新的group.id重新启动应用程序。并保留配置auto.offset.reset=latest

另一种选择是,在每个代理重新启动后,手动更改使用者组的偏移量。 Kafka带有ConsumerGroupCommand工具。您可以在Kafka [文档] [1]中找到一些信息。

如果您打算重置特定的使用者组(“ myConsumerGroup”),则可以使用

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group myConsumerGroup --topic topic1 --to-latest

根据您的要求,您可以使用该工具为主题的每个分区重置偏移量。帮助功能或文档说明了这些选项。
© www.soinside.com 2019 - 2024. All rights reserved.