Rocketmq 消息丢失

问题描述 投票:0回答:1

我使用了 2 个不同的消费者来消费来自同一主题的数据。我预计两个消费者会返回相同的结果。但我发现有些消息被第一个消费者捕获,但第二个消费者未能捕获。我不知道是什么原因造成的。有人可以告诉我怎样才能保证消费中不会丢失消息?

首先我尝试设置一个时间间隔,即1小时。然后我使用2个不同的消费者来获取一个小时内创建的消息。我比较了2个消费者捕获的消息数量,它们是不同的。我尝试设置不同的时间间隔,但不同消费者的结果仍然不同。

python message loss rocketmq
1个回答
0
投票

它是消息模型。你可以尝试 MessageModel.BROADCASTING。

  • 消息模型.集群。 image show here

  • MessageModel.BROADCASTING image show here

  • 代码演示

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // consumer.setMessageModel(MessageModel.BROADCASTING); // you can try this
        // consumer.setMessageModel(MessageModel.CLUSTERING); // default 

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.