使用 Spring Integration 在不同的 Spring Boot 应用程序之间发送 JMS 消息

问题描述 投票:0回答:1

我正在尝试使用 Spring 集成从一个 Spring Boot 应用程序(主程序)向另一个 Spring Boot 应用程序(工作程序)发送 Jms 消息。我使用嵌入式 ActiveMQ 作为代理。我做了以下事情:

MasterConfig

@Profile("master")
@EnableBatchProcessing
@EnableBatchIntegration
public class MasterConfig {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Bean
    public MessageChannel inputChannel(){
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow jmsOutboundFlow(JmsTemplate jmsTemplate){
        return IntegrationFlows.from(inputChannel())
                .handle(Jms.outboundAdapter(jmsTemplate).destination("requestQueue"))
                .get();
    }

    @Bean
    public ApplicationRunner runner(){
        return args -> {
            for(int i = 1; i <= 3; i++)
                inputChannel().send(MessageBuilder.withPayload("hello " + i).build());
        };
    }

}

WorkerConfig

@Profile("worker")
@EnableBatchIntegration
@EnableBatchProcessing
public class WorkerConfig {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow(){
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cachingConnectionFactory).destination("requestQueue"))
                .handle(new MessageHandler() {
                    @Override
                    public void handleMessage(Message<?> message) throws MessagingException {
                        System.out.println(message.getPayload());
                    }
                })
                .get();
    }
}

CommonConfig

@Profile("worker | master")
@Configuration
@EnableIntegration
public class CommonConfig {

    @Value("${activemq.broker-url}")
    private String brokerUrl;

    @Bean
    public ConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
        cf.setBrokerURL(brokerUrl);
        return cf;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(){
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate =
                new JmsTemplate(cachingConnectionFactory());
        jmsTemplate.setPubSubDomain(false);
        return jmsTemplate;
    }

    @Bean
    public Queue requestQueue(){
        return new ActiveMQQueue("queue.demo");
    }

    @Bean
    public Queue replyQueue(){
        return new ActiveMQQueue("queue.reply");
    }

}

pom.xml

中的依赖关系
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jms</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-ftp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

application.properties

activemq.broker-url=vm://localhost

所以我使用

mvn clean package
命令编译了项目,然后使用不同的弹簧配置文件集运行 jar 两次
第一:
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=worker

第二:
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=master

但是两个应用程序都只是关闭并且没有消息接收或发送。我期望的是,当工作应用程序启动时,它将等待任何消息,当主应用程序启动时,它将发送三个消息,并且在工作应用程序中将被传入消息触发并打印消息。

以下是我启动工作程序和主应用程序时发生的情况的日志:

2023-10-26 15:05:45.494  INFO 12348 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "worker"
2023-10-26 15:05:45.843  INFO 12348 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:05:45.854  INFO 12348 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:05:47.156  INFO 12348 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.156  INFO 12348 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:05:47.157  INFO 12348 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:05:47.169  INFO 12348 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 1.989 seconds (JVM running for 2.403)
2023-10-26 15:05:47.176  INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.177  INFO 12348 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:05:47.178  INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'

=============================================================================================================================================================================

2023-10-26 15:06:16.995  INFO 12528 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "master"
2023-10-26 15:06:17.349  INFO 12528 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:06:17.360  INFO 12528 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:06:18.663  INFO 12528 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.664  INFO 12528 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:06:18.664  INFO 12528 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:06:18.673  INFO 12528 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 2.011 seconds (JVM running for 2.425)
2023-10-26 15:06:18.683  INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.683  INFO 12528 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:06:18.683  INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'

我目前仍在学习 spring-integration,并且只遵循这些来源:

  1. 当两个应用程序都使用嵌入式 activemq 时,如何将 Jms 消息从一个 spring-boot 应用程序发送到另一个应用程序
  2. Spring 集成示例 - JMS

有什么建议可以实现我的预期吗?

更新

我做了一些更改,如下: 在

application.properties

activemq.broker-url=tcp://localhost:61616

我在

CommonConfig
中添加以下bean:

    @Bean
    public BrokerService broker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        return broker;
    }

以及以下对

pom.xml
的依赖:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-kahadb-store</artifactId>
        </dependency>

但仍然没有沟通:

2023-10-27 15:18:19.236  INFO 24904 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "worker"
2023-10-27 15:18:19.630  INFO 24904 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:18:19.643  INFO 24904 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:18:20.129  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:18:20.171  INFO 24904 --- [           main] o.a.a.store.kahadb.MessageDatabase       : KahaDB is version 7
2023-10-27 15:18:20.286  INFO 24904 --- [           main] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:18:20.415  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:18:20.420  INFO 24904 --- [           main] o.a.a.t.TransportServerThreadSupport     : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:18:20.421  INFO 24904 --- [           main] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:18:20.421  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:18:20.422  INFO 24904 --- [           main] o.apache.activemq.broker.BrokerService   : For help or more information please see: http://activemq.apache.org
2023-10-27 15:18:21.314  INFO 24904 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.314  INFO 24904 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:18:21.316  INFO 24904 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.331  INFO 24904 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 2.423 seconds (JVM running for 2.828)
2023-10-27 15:18:21.336  INFO 24904 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:18:21.337  INFO 24904 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:18:21.339  INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.339  INFO 24904 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:18:21.340  INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.341  INFO 24904 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:18:21.342  INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async queue tasks
2023-10-27 15:18:21.343  INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async topic tasks

=======================================================================================================

2023-10-27 15:20:13.451  INFO 17960 --- [           main] i.b.t.TestJmsIntegrationApplication      : The following 1 profile is active: "master"
2023-10-27 15:20:13.807  INFO 17960 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:20:13.820  INFO 17960 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:20:14.282  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:20:14.326  INFO 17960 --- [           main] o.a.a.store.kahadb.MessageDatabase       : KahaDB is version 7
2023-10-27 15:20:14.448  INFO 17960 --- [           main] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:20:14.583  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.a.a.t.TransportServerThreadSupport     : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:20:14.589  INFO 17960 --- [           main] o.apache.activemq.broker.BrokerService   : For help or more information please see: http://activemq.apache.org
2023-10-27 15:20:15.453  INFO 17960 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.454  INFO 17960 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:20:15.456  INFO 17960 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.469  INFO 17960 --- [           main] i.b.t.TestJmsIntegrationApplication      : Started TestJmsIntegrationApplication in 2.35 seconds (JVM running for 2.758)
2023-10-27 15:20:15.474  INFO 17960 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:20:15.475  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector   : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:20:15.476  INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.478  INFO 17960 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl  : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:20:15.479  INFO 17960 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:20:15.480  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async queue tasks
2023-10-27 15:20:15.481  INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer       : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.481  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopping async topic tasks
2023-10-27 15:20:15.484  INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore    : Stopped KahaDB
spring-integration spring-jms
1个回答
0
投票

vm:
传输仅在同一 JVM 中可用;您需要启用
tcp:
传输。

看到这个答案:是否可以从另一个应用程序连接到Spring Boot嵌入式ActiveMQ实例(在单独的进程中启动)?

© www.soinside.com 2019 - 2024. All rights reserved.