如何让kafka消费者从上次消费的偏移中读取,而不是从一开始就读取

问题描述 投票:10回答:3

我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始。

我正在写一个案例,所以我的意图不会偏离。

Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM

有没有办法从最后消耗的偏移量中获取消息。?

apache-kafka kafka-consumer-api
3个回答
9
投票

我是kafka的新手并试图了解是否有办法从上次消耗的偏移中读取消息,但不是从头开始。

是的,可以使用控制台消费者来读取上次消耗的偏移量。您必须在调用kafka-console-consumer时添加consumer.config标志。

例:-

[root@sandbox bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties

这里/home/mrnakumar/consumer.properties是一个包含group.id的文件。以下是/home/mrnakumar/consumer.properties的外观: -

group.id = consoleGroup

在不使用consumer.config的情况下,可以从[开始使用--from-beginning]开始读取,也可以从仅Log开始读取。日志结束表示消费者启动后发布的所有消息。


3
投票

您应该在auto.offset.reset上的消费者配置中设置largest参数,这样它将在最后提交的偏移量之后读取所有消息。


3
投票

在消费者配置中设置auto.offset.reset=earliest和固定的group.id=something将在最后提交的偏移量处启动消费者。在你的情况下,应该在7:20的第一条消息开始消费。如果你想让它开始阅读它开始后发布的消息,那么auto.offset.reset=latest将忽略在7:20发送的10条消息,并在它启动后读取任何进来的消息。

如果您希望它从头开始,您必须在第一个seekToBeginning之后调用consumer.poll(),或者将使用者组ID更改为唯一的。

© www.soinside.com 2019 - 2024. All rights reserved.