我正在努力将 RabbitMQ 应用程序配置从 XML 重写为 Java。可悲的是,一旦执行代码,就会出现非常普遍的错误:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException:
Failed to invoke target method 'receiveMessage' with
argument type = [class [B], value = [{[B@3bd0e47}]
...
Caused by: java.lang.NoSuchMethodException: com.mycompany.MessageListener.receiveMessage([B)
如果我的配置基于 XML,应用程序就可以工作,如下所列。 我试图根据 Spring Integration、AMQP、Rabbit 文档重写它。尽管如此,spring 配置文档主要是基于 xml 的,因此我的问题。
XML 会议:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory" host="mycompany-host"
username="mycompany-username"
password="mycompany-password"
virtual-host="mycompany-vhost"/>
<rabbit:template id="mycompany-template" connection-factory="connectionFactory" />
<rabbit:admin id="admin" connection-factory="connectionFactory" />
<!-- ##### -->
<rabbit:queue id="queue-id" name="queue-name" declared-by="admin"/>
<rabbit:direct-exchange name="mycompany-incoming-events" declared-by="admin">
<rabbit:bindings>
<rabbit:binding queue="queue-name" key="" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- ##### -->
<int-amqp:inbound-channel-adapter channel="mycompany-channel"
queue-names="queue-name" connection-factory="connectionFactory" />
<int:chain input-channel="mycompany-channel">
<int:transformer>
<bean class="com.mycompany.MyCompanyParser"/>
</int:transformer>
<int:filter expression="payload.header != null"/>
<int:transformer>
<bean class="com.mycompany.MyCompanyHeaderEnricher"/>
</int:transformer>
<int:recipient-list-router>
<int:recipient channel="dataSubmittedChannel"/>
</int:recipient-list-router>
</int:chain>
<int:chain input-channel="dataSubmittedChannel">
<int:filter expression="headers.mycompany_enriched_header.name().equals('MY_COMPANY_CONSTRAINT')" />
<int:service-activator>
<bean class="com.mycompany.MessageListener"/>
</int:service-activator>
</int:chain>
</beans>
Java 监听器:
@Component
public class MessageListener {
public void receiveMessage(final MyCompanyParsedType msg){
System.out.println(msg.toString());
}
}
经过一些重写,我设法想出了这个基于 Java 的配置:
import com.nxcs3.gamefetcher.configuration.SampleConfiguration;
import com.nxcs3.gamefetcher.listener.GameMessageListener;
import nxcs.drept.nxcs2events.EventHeadersEnricher;
import nxcs.drept.nxcs2events.EventParser;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.dsl.IntegrationFlow;
@SpringBootApplication
public class MyCompanySpringBootApp {
public static final String MESSAGE_QUEUE = "queue-name";
public static final String MESSAGE_EXCHANGE = "mycompany-incoming-events";
public static void main(String[] args) {
SpringApplication.run(MyCompanySpringBootApp.class);
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(MESSAGE_EXCHANGE);
}
@Bean
public Queue queue(){
return new Queue(MESSAGE_QUEUE, true);
}
@Bean
public Binding binding(Queue queue){
return BindingBuilder.bind(queue).to(exchange()).with(MESSAGE_QUEUE);
}
@Bean
MessageListenerAdapter listenerAdapter(MessageListener receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
public IntegrationFlow flow(){
return f -> f.log()
.transform(new MyCompanyParser())
.filter("payload.header != null")
.transform(new MyCompanyHeaderEnricher())
.filter("headers.mycompany_enriched_header.name().equals('MY_COMPANY_CONSTRAINT')");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(MESSAGE_QUEUE);
container.setMessageListener(listenerAdapter);
return container;
}
}
我通过 yaml 提供连接细节。 正如我之前提到的,我显然错过了一些东西。 配置哪里出错了?
评论后添加部分,建议解决方案:
所以我删除了
MessageListenerAdapter
并使用AmqpInboundChannelAdapter
和@ServiceActivator
替换它
结果看起来像:
@SpringBootApplication
public class MyCompanySpringBootApp {
public static final String MESSAGE_QUEUE = "queue-name";
public static final String MESSAGE_EXCHANGE = "mycompany-incoming-events";
public static void main(String[] args) {
SpringApplication.run(MyCompanySpringBootApp.class);
}
@Bean
public DirectExchange exchange(){
return new DirectExchange(MESSAGE_EXCHANGE);
}
@Bean
public Queue queue(){
return new Queue(MESSAGE_QUEUE, true);
}
@Bean
public Binding binding(Queue queue){
return BindingBuilder.bind(queue).to(exchange()).with(MESSAGE_QUEUE);
}
@Bean
public AmqpInboundChannelAdapter
channelAdapter(SimpleMessageListenerContainer container){
AmqpInboundChannelAdapter amqpInboundChannelAdapter = new
AmqpInboundChannelAdapter(container);
amqpInboundChannelAdapter.setOutputChannelName("adapter");
return amqpInboundChannelAdapter;
}
@Bean
public MessageListener handler(){
return new MessageListener();
}
@Bean
public IntegrationFlow flow(){
return f -> f.log()
.transform(new MyCompanyParser())
.filter("payload.header != null")
.transform(new MyCompanyHeaderEnricher())
.filter("headers.mycompany_enriched_header.name().equals('MY_COMPANY_CONSTRAINT')");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(MESSAGE_QUEUE);
container.setMessageListener(listenerAdapter);
return container;
}
}
和听众:
@Component
public class MessageListener {
@ServiceActivator(inputChannel = "adapter")
public void receiveMessage(final MyCompanyParsedType msg){
System.out.println(msg.toString());
}
}
这让我们更接近一点,因为消息是在
receiveMessage
方法内部接受和处理的。
然而,不知何故,即将到来的消息不会通过IntegrationFlow
过滤器。消息似乎完全是随机的。 我添加了进口
MessageListenerAdapter
默认使用SimpleMessageConverter
。
它的逻辑是基于
contentType
属性的存在。
根据你的错误,这听起来好像消费消息中没有这个属性,因此它回退到
message.getBody()
,无论如何都是byte[]
。
您可以考虑在该
MessageConverter
中指定所需的MessageListenerAdapter
,例如SerializerMessageConverter
与ignoreContentType = true
.