如何在kafka中跟踪未消费的消息

问题描述 投票:0回答:1

我有一个生成特定主题消息的生产者:

制作人:

  // Produce a message to a Kafka topic
        String topic = "dev.topic.proxy";
        String key = "some-key";
        String value = "Hello, Kafka!";

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        while (true) {
            producer.send(record);
            System.out.println("Message has been sent");
        }

消费者:

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="readMsg">
        <description>Route to refresh the proxy routes</description>
       <from uri="appKafkaConsumer:dev.topic.proxy" />
        <log message="Received message with headers: ${headers}" loggingLevel="INFO" />
        <delay>
        <constant>5000</constant>
    </delay>
    </route>
</routes>

这就是camel中的消费者。当我启动应用程序时,groupId 是随机自动生成的。我停止了应用程序,当我重新启动应用程序时,由于分配新的 groupId,许多消息丢失了。我不想丢失消息。解决这个问题的最佳方法是什么?

spring-boot apache-kafka apache-camel
1个回答
0
投票

确保您已将 kafka 消费者配置为从最早的偏移量而不是最新的偏移量开始消费。

请注意,这将导致消费者重新处理已经收到的消息。为了让特定的消费者只处理一次消息,它必须保持相同的消费者组id。

如果您希望所有消息在所有消费者中处理一次且仅处理一次,那么我建议查看 ActiveMQ 或类似的队列解决方案,而不是 Kafka。

© www.soinside.com 2019 - 2024. All rights reserved.