我有一个业务场景,消费者不应使用主题中已提交/未提交的消息当使用者或kafka重新启动时,我尝试应用auto.offset.reset: latest
。但是它从主题中提取了未提交的偏移量。拥有一个带有一个带有1个主题和1个分区的实例的应用程序。假设我发布了10条消息,消费者选择了5条消息并提交了偏移量。现在,我重新启动我的消费者实例/ kafka。重新启动后,它不应再选择旧的5条消息尚未提交。正在寻找任何其他配置或解决方法。
group.id
)。即使重新启动代理,仍将使用新的group.id
重新启动应用程序。并保留配置auto.offset.reset=latest
。另一种选择是,在每个代理重新启动后,手动更改使用者组的偏移量。 Kafka带有ConsumerGroupCommand工具。您可以在Kafka [文档] [1]中找到一些信息。
如果您打算重置特定的使用者组(“ myConsumerGroup”),则可以使用
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group myConsumerGroup --topic topic1 --to-latest
根据您的要求,您可以使用该工具为主题的每个分区重置偏移量。帮助功能或文档说明了这些选项。