我使用了 2 个不同的消费者来消费来自同一主题的数据。我预计两个消费者会返回相同的结果。但我发现有些消息被第一个消费者捕获,但第二个消费者未能捕获。我不知道是什么原因造成的。有人可以告诉我怎样才能保证消费中不会丢失消息?
首先我尝试设置一个时间间隔,即1小时。然后我使用2个不同的消费者来获取一个小时内创建的消息。我比较了2个消费者捕获的消息数量,它们是不同的。我尝试设置不同的时间间隔,但不同消费者的结果仍然不同。
它是消息模型。你可以尝试 MessageModel.BROADCASTING。
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// consumer.setMessageModel(MessageModel.BROADCASTING); // you can try this
// consumer.setMessageModel(MessageModel.CLUSTERING); // default
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}