RabbitMQ对Spring的直接答复问题

问题描述 投票:0回答:1

我正在处理一个将消息发送到作为服务器的应用程序,然后修改给定消息并使用Direct Reply-to将其发送回amq.rabbitmq.reply-to队列。我已经按照教程https://www.rabbitmq.com/direct-reply-to.html进行操作,但是在实施时遇到了一些问题。就我所知,我需要在no-ack模式下使用伪队列amq.rabbitmq.reply-to中的消息,在我的情况下为MessageListenerContainer。这是我的配置:

@Bean
    public Jackson2JsonMessageConverter messageConverter() {    
        ObjectMapper mapper = new ObjectMapper();
        return new Jackson2JsonMessageConverter(mapper);
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setReplyAddress("amq.rabbitmq.reply-to");
        return rabbitTemplate;
    }

    @Bean
    MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory ) {       
        DirectMessageListenerContainer directMessageListenerContainer = new DirectMessageListenerContainer();
        directMessageListenerContainer.setConnectionFactory(connectionFactory);
        directMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
        directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");
        directMessageListenerContainer.setMessageListener(new PracticalMessageListener());
        return directMessageListenerContainer;

    }

消息通过STOM协议上的SEND帧作为JSON发送并进行转换。然后是新队列动态创建并添加到MessageListenerContainer。因此,当消息到达代理时,我想在服务器端对其进行修改,然后发送回amq.rabbitmq.reply-to,并将原始消息发送到路由密钥messageTemp.getTo(),该路由密钥已在STOMP的SUBSCRIBE帧中预订。

  @MessageMapping("/private")
  public void send2(MessageTemplate messageTemp) throws Exception {
      MessageTemplate privateMessage = new MessageTemplate(messageTemp.getPerson(),
              messageTemp.getMessage(), 
              messageTemp.getTo());

     AbstractMessageListenerContainer abstractMessageListenerContainer =
              (AbstractMessageListenerContainer) mlc;

       // here's the queue added to listener container   
      abstractMessageListenerContainer.addQueueNames(messageTemp.getTo());

      MessageProperties mp = new MessageProperties();
      mp.setReplyTo("amq.rabbitmq.reply-to");
      mp.setCorrelationId("someId");

      Jackson2JsonMessageConverter smc = new Jackson2JsonMessageConverter();
      Message message = smc.toMessage(messageTemp, mp);


      rabbitTemplate.sendAndReceive( 
              messageTemp.getTo() , message);
  }

当消息发送到messageTemp.getTo()路由密钥时,消息被修改为onMessage方法

@Component
public class PracticalMessageListener implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));
        String body = "{ \"processing\": \"123456789\"}";
       MessageProperties properties = new MessageProperties();

       // some business logic on the message body

        properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
        Message responseMessage = new Message(body.getBytes(), properties);

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), responseMessage);
    }

我可能会误解直接答复的概念和说明以下内容的文档:

在no-ack模式下从伪队列amq.rabbitmq.reply-中消费。无需先声明此“队列”,尽管客户端可以根据需要声明。

问题是我需要从该队列中消费什么?以及如果出现错误,如何访问修改后的消息:

2020-01-15 22:17:09.688  WARN 96222 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
Caused by: java.lang.NullPointerException: null
    at com.patrykmaryn.spring.second.PracticalMessageListener.onMessage(PracticalMessageListener.java:50) ~[classes/:na]

当我在rabbitTemplate.convertAndSend中调用PracticalMessageListener时来自哪里?>

编辑

我摆脱了在amq.rabbitmq.reply-to中设置DirectMessageListenerContainer并实现了DirectReplyToMessageListenerContainer

@Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(new DirectMessageListener());
        return drtmlc;
    }

问题必须出在onMessage方法中,该方法不允许调用rabbitTemplate上的任何发送方法,我尝试使用其他现有的路由键和交换器。侦听来自使用路由键messageTemp.getTo()定义的队列。

@Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));

        String receivedRoutingKey = message.getMessageProperties()
           .getReceivedRoutingKey();
        System.out.println(" This is received routingkey: " + 
            receivedRoutingKey);

           /// ..... rest of code goes here

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), 
                responseMessage);

[messageTemp.getTo()是在运行时定义的路由密钥,通过选择接收器,例如,如果我选择了'user1',它将打印出'user1'。

这是第一次发送消息:

2020-01-16 02:22:20.213 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:22:20.214 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:22:20.239  INFO 28490 --- [nboundChannel-6] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=user1, consumerTag=amq.ctag-Evyiweew4C-K1mXmy2XqUQ identity=57b19488] started
2020-01-16 02:22:20.268  INFO 28490 --- [nboundChannel-6] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-01-16 02:22:20.269  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2020-01-16 02:22:20.286  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-IXWf-zEyI34xzQSSfbijzg identity=4bedbba5] started

第二次失败:

2020-01-16 02:23:20.247 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:23:20.248 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:23:20.248  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
2020-01-16 02:23:20.250  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
message listener..
 This is received routingkey: user1
2020-01-16 02:23:20.271  WARN 28490 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception

我正在处理一个将消息发送到服务器的应用程序,然后修改给定消息并使用Direct Reply-to将其发送回amq.rabbitmq.reply-to队列。我已经按照教程进行了操作...

java spring rabbitmq spring-rabbitmq
1个回答
1
投票

是,您对功能有误解。

© www.soinside.com 2019 - 2024. All rights reserved.