Java Kafka Consumer使用新的groupID / ConsumerID可以正常工作,但是在关闭并重新启动时,它不会消耗任何消息

问题描述 投票:1回答:1

这可能是我缺乏理解,但令人沮丧。我有一个Java Kafka使用者,当分配了一个新创建的groupId和ConsumerID时,它将消耗消息。但是,当在我的Eclipse IDE中停止Java应用程序并使用相同的groupID和ConsumerID重新启动它时,它不会提取任何消息。如果我再次关闭该应用程序,并为其分配新的和不同的groupID / consumerID,它就可以正常工作。谁能帮我弄清楚为什么会这样吗?

下面的配置值

      props.put("bootstrap.servers","192.168.5.0:30092,192.168.4.6:30092,192.168.5.8:30092");
    props.put("acks", "all");
    props.put("retries", 4);
    props.put("batch.size", 1000);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 335544323);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Router2");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Consumer2");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "300000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
java apache-kafka kafka-consumer-api
1个回答
0
投票

默认情况下,当使用者开始阅读消息时,它会提交已成功使用的消息的偏移量,以使其不再消耗它们。

每次创建新的消费者组时,消费者将从最早的可用偏移量开始消费(因为以前没有为新创建的消费者组提交偏移量)。

消费者从协调员那里收到任务后,必须确定每个分配分区的初始位置。什么时候首先创建该组,然后再使用任何消息,根据可配置的偏移量重置策略设置位置(auto.offset.reset)。通常,消费开始于最早的偏移量或最新的偏移量。

有关更多详细信息,请参阅Confluent文档中的Offset Management


如果您想实现相同的行为,则只需坚持相同的消费群体,只需将auto.offset.reset设置为earliest即可,而不是将默认值latest设置为:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

这样,您的使用者将始终从最早可用的偏移量开始使用消息。


请注意,这与可以与Kafka Console使用者一起使用的--from-beginning标志具有相同的作用。

© www.soinside.com 2019 - 2024. All rights reserved.