我正在尝试在Spring Boot应用程序中监听RabbitMq队列。在我的RabbitConfig.java
文件中,有以下bean:
// RabbitConfig.java
// ...Queue, exchange, binding beans, etc
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(2);
container.setPrefetchCount(100);
container.setQueueNames(QUEUE_NAME);
return container;
}
@Bean
public AmqpTemplate getTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter(mapper));
return template;
}
我已经创建了一个侦听器容器,因为我需要对其进行一些设置,例如其预取计数。
我还创建了另一个类作为消息侦听器,如下所示:
@Component
public class MyMessageListener {
@RabbitListener(queues = QUEUE_NAME)
public void messageHandler(MyMessageObj message, Channel channel) throws IOException {
// process message...
}
但是,当我运行该应用程序时,它会引发错误Error Handler converted exception to fatal
。由于容器期望messageHandler
具有不同的方法签名,因此似乎发生了错误。
我想我可能在容器中做了一些不正确的事情,因为当我删除它时,应用程序可以运行并侦听队列,但不能配置容器中的设置。
我做错了什么,应该怎么做才能使容器正确使用messageHandler
?
[您能否尝试实现一个实现org.springframework.amqp.core.MessageListener并实现方法“ onMessage(Message message)”的MessageListener类,]
[拥有定义了SimpleMessageListenerContainer的@Configuration类之后,您需要具有一个实现org.springframework.amqp.core.MessageListener并实现方法“ onMessage(Message message)”的相应类。
让我知道是否有帮助。
@配置公共类MessageConfig {
private static String queue = "ARRQueue"; @Bean ConnectionFactory connectionFactory(){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory("localhost"); cachingConnectionFactory.setUsername("guest"); cachingConnectionFactory.setPassword("guest"); return cachingConnectionFactory; } @Bean MessageListenerContainer messageListenerContainer(){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(connectionFactory()); simpleMessageListenerContainer.setQueues(new Queue(queue, true)); simpleMessageListenerContainer.setMessageListener(new MessageListener()); return simpleMessageListenerContainer; }
}
公共类MessageListener实现org.springframework.amqp.core.MessageListener {@Override公共无效onMessage(消息){System.out.println(“ MessageListener.onMessage:” +新的String(message.getBody()));}