SimpleMessageListenerContainer无法正确调用消息处理程序

问题描述 投票:0回答:1

我正在尝试在Spring Boot应用程序中监听RabbitMq队列。在我的RabbitConfig.java文件中,有以下bean:

// RabbitConfig.java
// ...Queue, exchange, binding beans, etc

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setConcurrentConsumers(2);
    container.setPrefetchCount(100);
    container.setQueueNames(QUEUE_NAME);
    return container;
}

@Bean
public AmqpTemplate getTemplate(ConnectionFactory connectionFactory) {
    final RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter(mapper));
    return template;
}

我已经创建了一个侦听器容器,因为我需要对其进行一些设置,例如其预取计数。

我还创建了另一个类作为消息侦听器,如下所示:

@Component
public class MyMessageListener {
    @RabbitListener(queues = QUEUE_NAME)
    public void messageHandler(MyMessageObj message, Channel channel) throws IOException {
        // process message...
}

但是,当我运行该应用程序时,它会引发错误Error Handler converted exception to fatal。由于容器期望messageHandler具有不同的方法签名,因此似乎发生了错误。

我想我可能在容器中做了一些不正确的事情,因为当我删除它时,应用程序可以运行并侦听队列,但不能配置容器中的设置。

我做错了什么,应该怎么做才能使容器正确使用messageHandler

java spring-boot rabbitmq amqp spring-rabbitmq
1个回答
0
投票

[您能否尝试实现一个实现org.springframework.amqp.core.MessageListener并实现方法“ onMessage(Message message)”的MessageListener类,]

[拥有定义了SimpleMessageListenerContainer的@Configuration类之后,您需要具有一个实现org.springframework.amqp.core.MessageListener并实现方法“ onMessage(Message message)”的相应类。

让我知道是否有帮助。

@配置公共类MessageConfig {

private static String queue = "ARRQueue";
@Bean
ConnectionFactory connectionFactory(){
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost");
    cachingConnectionFactory.setUsername("guest");
    cachingConnectionFactory.setPassword("guest");
    return cachingConnectionFactory;
}

@Bean
MessageListenerContainer messageListenerContainer(){
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setConnectionFactory(connectionFactory());
    simpleMessageListenerContainer.setQueues(new Queue(queue, true));
    simpleMessageListenerContainer.setMessageListener(new MessageListener());
    return simpleMessageListenerContainer;
}

}

公共类MessageListener实现org.springframework.amqp.core.MessageListener {@Override公共无效onMessage(消息){System.out.println(“ MessageListener.onMessage:” +新的String(message.getBody()));}

© www.soinside.com 2019 - 2024. All rights reserved.