我有一个生成特定主题消息的生产者:
制作人:
// Produce a message to a Kafka topic
String topic = "dev.topic.proxy";
String key = "some-key";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
while (true) {
producer.send(record);
System.out.println("Message has been sent");
}
消费者:
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="readMsg">
<description>Route to refresh the proxy routes</description>
<from uri="appKafkaConsumer:dev.topic.proxy" />
<log message="Received message with headers: ${headers}" loggingLevel="INFO" />
<delay>
<constant>5000</constant>
</delay>
</route>
</routes>
这就是camel中的消费者。当我启动应用程序时,groupId 是随机自动生成的。我停止了应用程序,当我重新启动应用程序时,由于分配新的 groupId,许多消息丢失了。我不想丢失消息。解决这个问题的最佳方法是什么?
确保您已将 kafka 消费者配置为从最早的偏移量而不是最新的偏移量开始消费。
请注意,这将导致消费者重新处理已经收到的消息。为了让特定的消费者只处理一次消息,它必须保持相同的消费者组id。
如果您希望所有消息在所有消费者中处理一次且仅处理一次,那么我建议查看 ActiveMQ 或类似的队列解决方案,而不是 Kafka。