从 XML 到 Java 的 Spring AMQP 应用程序配置

问题描述 投票:0回答:1

我正在努力将 RabbitMQ 应用程序配置从 XML 重写为 Java。可悲的是,一旦执行代码,就会出现非常普遍的错误:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: 
Failed to invoke target method 'receiveMessage' with 
argument type = [class [B], value = [{[B@3bd0e47}]
...
Caused by: java.lang.NoSuchMethodException: com.mycompany.MessageListener.receiveMessage([B)

如果我的配置基于 XML,应用程序就可以工作,如下所列。 我试图根据 Spring Integration、AMQP、Rabbit 文档重写它。尽管如此,spring 配置文档主要是基于 xml 的,因此我的问题。

XML 会议:

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp
              http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <rabbit:connection-factory id="connectionFactory" host="mycompany-host"
                               username="mycompany-username"
                               password="mycompany-password"
                               virtual-host="mycompany-vhost"/>

    <rabbit:template id="mycompany-template" connection-factory="connectionFactory" />

    <rabbit:admin id="admin" connection-factory="connectionFactory" />

 <!-- ##### -->

    <rabbit:queue id="queue-id" name="queue-name" declared-by="admin"/>

    <rabbit:direct-exchange name="mycompany-incoming-events" declared-by="admin">
        <rabbit:bindings>
            <rabbit:binding queue="queue-name" key="" />
        </rabbit:bindings>
    </rabbit:direct-exchange>


 <!-- ##### -->


    <int-amqp:inbound-channel-adapter channel="mycompany-channel"
                                      queue-names="queue-name" connection-factory="connectionFactory" />

    <int:chain input-channel="mycompany-channel">
        <int:transformer>
            <bean class="com.mycompany.MyCompanyParser"/>
        </int:transformer>
        <int:filter expression="payload.header != null"/>
        <int:transformer>
            <bean class="com.mycompany.MyCompanyHeaderEnricher"/>
        </int:transformer>
        <int:recipient-list-router>
            <int:recipient channel="dataSubmittedChannel"/>
        </int:recipient-list-router>
    </int:chain>

    <int:chain input-channel="dataSubmittedChannel">
        <int:filter expression="headers.mycompany_enriched_header.name().equals('MY_COMPANY_CONSTRAINT')" />
        <int:service-activator>
            <bean class="com.mycompany.MessageListener"/>
        </int:service-activator>
    </int:chain>

</beans>

Java 监听器:

@Component
public class MessageListener {

    public void receiveMessage(final MyCompanyParsedType msg){
        System.out.println(msg.toString());
    }
}

经过一些重写,我设法想出了这个基于 Java 的配置:

import com.nxcs3.gamefetcher.configuration.SampleConfiguration;
import com.nxcs3.gamefetcher.listener.GameMessageListener;
import nxcs.drept.nxcs2events.EventHeadersEnricher;
import nxcs.drept.nxcs2events.EventParser;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.dsl.IntegrationFlow;

@SpringBootApplication
public class MyCompanySpringBootApp {

    public static final String MESSAGE_QUEUE = "queue-name";
    public static final String MESSAGE_EXCHANGE = "mycompany-incoming-events";

    public static void main(String[] args) {
        SpringApplication.run(MyCompanySpringBootApp.class);
    }


    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(MESSAGE_EXCHANGE);
    }

    @Bean
    public Queue queue(){
        return new Queue(MESSAGE_QUEUE, true);
    }

    @Bean
    public Binding binding(Queue queue){
        return BindingBuilder.bind(queue).to(exchange()).with(MESSAGE_QUEUE);
    }

    @Bean
    MessageListenerAdapter listenerAdapter(MessageListener receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    public IntegrationFlow flow(){
        return f -> f.log()
                .transform(new MyCompanyParser())
                .filter("payload.header != null")
                .transform(new MyCompanyHeaderEnricher())
                .filter("headers.mycompany_enriched_header.name().equals('MY_COMPANY_CONSTRAINT')");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(MESSAGE_QUEUE);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}

我通过 yaml 提供连接细节。 正如我之前提到的,我显然错过了一些东西。 配置哪里出错了?

评论后添加部分,建议解决方案:

所以我删除了

MessageListenerAdapter
并使用
AmqpInboundChannelAdapter
@ServiceActivator

替换它

结果看起来像:

@SpringBootApplication
public class MyCompanySpringBootApp {

    public static final String MESSAGE_QUEUE = "queue-name";
    public static final String MESSAGE_EXCHANGE = "mycompany-incoming-events";

    public static void main(String[] args) {
        SpringApplication.run(MyCompanySpringBootApp.class);
    }


    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(MESSAGE_EXCHANGE);
    }

    @Bean
    public Queue queue(){
        return new Queue(MESSAGE_QUEUE, true);
    }

    @Bean
    public Binding binding(Queue queue){
        return BindingBuilder.bind(queue).to(exchange()).with(MESSAGE_QUEUE);
    }

    @Bean
    public AmqpInboundChannelAdapter 
channelAdapter(SimpleMessageListenerContainer container){
        AmqpInboundChannelAdapter amqpInboundChannelAdapter = new 
AmqpInboundChannelAdapter(container);
        amqpInboundChannelAdapter.setOutputChannelName("adapter");
        return amqpInboundChannelAdapter;
    }

 @Bean
public MessageListener handler(){
    return new MessageListener();
}


    @Bean
    public IntegrationFlow flow(){
        return f -> f.log()
                .transform(new MyCompanyParser())
                .filter("payload.header != null")
                .transform(new MyCompanyHeaderEnricher())
                .filter("headers.mycompany_enriched_header.name().equals('MY_COMPANY_CONSTRAINT')");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(MESSAGE_QUEUE);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}

和听众:

@Component
public class MessageListener {

    @ServiceActivator(inputChannel = "adapter")
    public void receiveMessage(final MyCompanyParsedType msg){
        System.out.println(msg.toString());
    }
}

这让我们更接近一点,因为消息是在

receiveMessage
方法内部接受和处理的。 然而,不知何故,即将到来的消息不会通过
IntegrationFlow
过滤器。消息似乎完全是随机的。 我添加了进口

spring rabbitmq spring-integration spring-amqp spring-rabbit
1个回答
0
投票

MessageListenerAdapter
默认使用
SimpleMessageConverter

它的逻辑是基于

contentType
属性的存在。

根据你的错误,这听起来好像消费消息中没有这个属性,因此它回退到

message.getBody()
,无论如何都是
byte[]

您可以考虑在该

MessageConverter
中指定所需的
MessageListenerAdapter
,例如
SerializerMessageConverter
ignoreContentType = true
.

© www.soinside.com 2019 - 2024. All rights reserved.