RabbitMQ:多个消费者的循环

问题描述 投票:0回答:1

项目由三个主要部分组成:

  • 生产者(设备)通过MQTT协议向broker发送数据
  • RabbitMQ 代理
  • 消费者(Spring AMQP)用于应用程序和代理之间的通信。

(设备MQTT协议)生产者--> RabbitMQ <-- consumer (Spring app use AMQP protocol)

我使用 MQTTX Client Toolbox 来模拟生产者:

quick.white.rabbit - topic
ewogICAibWF0cml4IjoiZm9sbG93IHRoZSB3aGl0ZSByYWJiaXTigKYiLAogICAiZGF0ZSI6Ik1hcmNoIDMxLCAxOTk5Igp9 - Base64 encoding data
@Configuration
public class RabbitMQConfig {
    @Bean
    public TopicExchange topic() {
        return new TopicExchange("amq.topic"); // is that OK to use default name amq.topic ?
    }
    @Bean
    public Queue autoDeleteQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding binding(TopicExchange topic,
                             Queue autoDeleteQueue) {
        return BindingBuilder.bind(autoDeleteQueue)
                .to(topic)
                .with("*.*.rabbit");
    }
}
@Service
public class RabbitMQConsumer {

    static void dumb(Object payload, Map<String, Object> headers){
        System.out.println("-----------------------------------");
        headers.forEach((k,v) -> System.out.println(k + "=" + v));
        System.out.println(payload);
    }

    @RabbitListener(queues = "#{autoDeleteQueue.name}")
    public void consumePayload(@Payload String encodedMessage, @Headers Map<String, Object> headers){
        String payload = new String(Base64.getDecoder().decode(encodedMessage));
        dumb(payload, headers);
    }
}

我在不同的端口 8081 和 8082 上运行同一应用程序使用者的两个实例

    java -jar target/consumer_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8081
    java -jar target/consumer_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8082

问题是消息同时到达两个消费者

java -jar target/deep_dive_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8081 ----------------------------------- amqp_receivedDeliveryMode=NON_PERSISTENT amqp_receivedRoutingKey=quick.white.rabbit amqp_receivedExchange=amq.topic x-mqtt-publish-qos=0 x-mqtt-dup=false amqp_deliveryTag=3 amqp_consumerQueue=spring.gen-X6e3uiz5S-K4_RA0tt89cg amqp_redelivered=false id=debc649f-9ae1-82a8-be5f-0a3f2c9e2f87 amqp_consumerTag=amq.ctag-brMleQwCDNmcCHyT_lbF8A amqp_lastInBatch=false timestamp=1708633236541 { "matrix":"follow the white rabbit…", "date":"March 31, 1999" }
java -jar target/deep_dive_rabbitmq-0.0.1-SNAPSHOT.jar --server.port=8082
-----------------------------------
amqp_receivedDeliveryMode=NON_PERSISTENT
amqp_receivedRoutingKey=quick.white.rabbit
amqp_receivedExchange=amq.topic
x-mqtt-publish-qos=0
x-mqtt-dup=false
amqp_deliveryTag=3
amqp_consumerQueue=spring.gen-Ym1vcVbUQ4aZqnG3Bu4OyA
amqp_redelivered=false
id=dd1a5d91-4972-dcca-0669-dc40f9d85a15
amqp_consumerTag=amq.ctag-zqy6gfLSLaZSf3H3W00M1A
amqp_lastInBatch=false
timestamp=1708633236541
{
   "matrix":"follow the white rabbit…",
   "date":"March 31, 1999"
}
如何配置 RabbitMQ 使用循环消费者,每次传递消息时,队列中的下一个消费者将收到下一条消息?目前它同时适用于两个消费者。

MESSAGE 1 // port 8081 MESSAGE 2 // port 8082 MESSAGE 3 // port 8081 MESSAGE 4 // port 8082
    
rabbitmq amqp spring-amqp spring-rabbit round-robin
1个回答
0
投票
为了实现该目标,您不得使用

TopicExchange

,而应使用 
DirectExchange

请参阅本教程中的更多信息:

https://www.rabbitmq.com/tutorials/tutorial-two-spring-amqp

© www.soinside.com 2019 - 2024. All rights reserved.