这可能是我缺乏理解,但令人沮丧。我有一个Java Kafka使用者,当分配了一个新创建的groupId和ConsumerID时,它将消耗消息。但是,当在我的Eclipse IDE中停止Java应用程序并使用相同的groupID和ConsumerID重新启动它时,它不会提取任何消息。如果我再次关闭该应用程序,并为其分配新的和不同的groupID / consumerID,它就可以正常工作。谁能帮我弄清楚为什么会这样吗?
下面的配置值
props.put("bootstrap.servers","192.168.5.0:30092,192.168.4.6:30092,192.168.5.8:30092"); props.put("acks", "all"); props.put("retries", 4); props.put("batch.size", 1000); props.put("linger.ms", 1); props.put("buffer.memory", 335544323); props.put(ConsumerConfig.GROUP_ID_CONFIG, "Router2"); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Consumer2"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "300000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
默认情况下,当使用者开始阅读消息时,它会提交已成功使用的消息的偏移量,以使其不再消耗它们。
每次创建新的消费者组时,消费者将从最早的可用偏移量开始消费(因为以前没有为新创建的消费者组提交偏移量)。
消费者从协调员那里收到任务后,必须确定每个分配分区的初始位置。什么时候首先创建该组,然后再使用任何消息,根据可配置的偏移量重置策略设置位置(
auto.offset.reset
)。通常,消费开始于最早的偏移量或最新的偏移量。
有关更多详细信息,请参阅Confluent文档中的Offset Management。
如果您想实现相同的行为,则只需坚持相同的消费群体,只需将auto.offset.reset
设置为earliest
即可,而不是将默认值latest
设置为:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
这样,您的使用者将始终从最早可用的偏移量开始使用消息。
请注意,这与可以与Kafka Console使用者一起使用的--from-beginning
标志具有相同的作用。