我正在尝试使用 Spring 集成从一个 Spring Boot 应用程序(主程序)向另一个 Spring Boot 应用程序(工作程序)发送 Jms 消息。我使用嵌入式 ActiveMQ 作为代理。我做了以下事情:
MasterConfig
@Profile("master")
@EnableBatchProcessing
@EnableBatchIntegration
public class MasterConfig {
@Autowired
private JmsTemplate jmsTemplate;
@Bean
public MessageChannel inputChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow jmsOutboundFlow(JmsTemplate jmsTemplate){
return IntegrationFlows.from(inputChannel())
.handle(Jms.outboundAdapter(jmsTemplate).destination("requestQueue"))
.get();
}
@Bean
public ApplicationRunner runner(){
return args -> {
for(int i = 1; i <= 3; i++)
inputChannel().send(MessageBuilder.withPayload("hello " + i).build());
};
}
}
WorkerConfig
:
@Profile("worker")
@EnableBatchIntegration
@EnableBatchProcessing
public class WorkerConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Bean
public IntegrationFlow jmsMessageDrivenFlow(){
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cachingConnectionFactory).destination("requestQueue"))
.handle(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
})
.get();
}
}
CommonConfig
@Profile("worker | master")
@Configuration
@EnableIntegration
public class CommonConfig {
@Value("${activemq.broker-url}")
private String brokerUrl;
@Bean
public ConnectionFactory connectionFactory(){
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL(brokerUrl);
return cf;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(){
return new CachingConnectionFactory(connectionFactory());
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate =
new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setPubSubDomain(false);
return jmsTemplate;
}
@Bean
public Queue requestQueue(){
return new ActiveMQQueue("queue.demo");
}
@Bean
public Queue replyQueue(){
return new ActiveMQQueue("queue.reply");
}
}
pom.xml
中的依赖关系
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
application.properties
activemq.broker-url=vm://localhost
所以我使用
mvn clean package
命令编译了项目,然后使用不同的弹簧配置文件集运行 jar 两次java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=worker
java -jar target/test-jms-integration-0.0.1-SNAPSHOT.jar --spring.profiles.active=master
但是两个应用程序都只是关闭并且没有消息接收或发送。我期望的是,当工作应用程序启动时,它将等待任何消息,当主应用程序启动时,它将发送三个消息,并且在工作应用程序中将被传入消息触发并打印消息。
以下是我启动工作程序和主应用程序时发生的情况的日志:
2023-10-26 15:05:45.494 INFO 12348 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "worker"
2023-10-26 15:05:45.843 INFO 12348 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:05:45.854 INFO 12348 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:05:47.156 INFO 12348 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.156 INFO 12348 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:05:47.157 INFO 12348 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:05:47.169 INFO 12348 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 1.989 seconds (JVM running for 2.403)
2023-10-26 15:05:47.176 INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:05:47.177 INFO 12348 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:05:47.178 INFO 12348 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
=============================================================================================================================================================================
2023-10-26 15:06:16.995 INFO 12528 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "master"
2023-10-26 15:06:17.349 INFO 12528 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-26 15:06:17.360 INFO 12528 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-26 15:06:18.663 INFO 12528 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.664 INFO 12528 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-26 15:06:18.664 INFO 12528 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-26 15:06:18.673 INFO 12528 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 2.011 seconds (JVM running for 2.425)
2023-10-26 15:06:18.683 INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-26 15:06:18.683 INFO 12528 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-26 15:06:18.683 INFO 12528 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
我目前仍在学习 spring-integration,并且只遵循这些来源:
有什么建议可以实现我的预期吗?
我做了一些更改,如下: 在
application.properties
:
activemq.broker-url=tcp://localhost:61616
我在
CommonConfig
中添加以下bean:
@Bean
public BrokerService broker() throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
return broker;
}
以及以下对
pom.xml
的依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
但仍然没有沟通:
2023-10-27 15:18:19.236 INFO 24904 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "worker"
2023-10-27 15:18:19.630 INFO 24904 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:18:19.643 INFO 24904 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:18:20.129 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:18:20.171 INFO 24904 --- [ main] o.a.a.store.kahadb.MessageDatabase : KahaDB is version 7
2023-10-27 15:18:20.286 INFO 24904 --- [ main] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:18:20.415 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:18:20.420 INFO 24904 --- [ main] o.a.a.t.TransportServerThreadSupport : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:18:20.421 INFO 24904 --- [ main] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:18:20.421 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:18:20.422 INFO 24904 --- [ main] o.apache.activemq.broker.BrokerService : For help or more information please see: http://activemq.apache.org
2023-10-27 15:18:21.314 INFO 24904 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.314 INFO 24904 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:18:21.316 INFO 24904 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.331 INFO 24904 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 2.423 seconds (JVM running for 2.828)
2023-10-27 15:18:21.336 INFO 24904 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:18:21.337 INFO 24904 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:18:21.339 INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:18:21.339 INFO 24904 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:18:21.340 INFO 24904 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:18:21.341 INFO 24904 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:18:21.342 INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async queue tasks
2023-10-27 15:18:21.343 INFO 24904 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async topic tasks
=======================================================================================================
2023-10-27 15:20:13.451 INFO 17960 --- [ main] i.b.t.TestJmsIntegrationApplication : The following 1 profile is active: "master"
2023-10-27 15:20:13.807 INFO 17960 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2023-10-27 15:20:13.820 INFO 17960 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2023-10-27 15:20:14.282 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\test-jms-integration\activemq-data\localhost\KahaDB]
2023-10-27 15:20:14.326 INFO 17960 --- [ main] o.a.a.store.kahadb.MessageDatabase : KahaDB is version 7
2023-10-27 15:20:14.448 INFO 17960 --- [ main] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] started
2023-10-27 15:20:14.583 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is starting
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.a.a.t.TransportServerThreadSupport : Listening for connections at: tcp://127.0.0.1:61616
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 started
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) started
2023-10-27 15:20:14.589 INFO 17960 --- [ main] o.apache.activemq.broker.BrokerService : For help or more information please see: http://activemq.apache.org
2023-10-27 15:20:15.453 INFO 17960 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.454 INFO 17960 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2023-10-27 15:20:15.456 INFO 17960 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.469 INFO 17960 --- [ main] i.b.t.TestJmsIntegrationApplication : Started TestJmsIntegrationApplication in 2.35 seconds (JVM running for 2.758)
2023-10-27 15:20:15.474 INFO 17960 --- [MQ ShutdownHook] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.16.6 (localhost, ID:) is shutting down
2023-10-27 15:20:15.475 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.broker.TransportConnector : Connector tcp://127.0.0.1:61616 stopped
2023-10-27 15:20:15.476 INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2023-10-27 15:20:15.478 INFO 17960 --- [MQ ShutdownHook] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\test-jms-integration\activemq-data\localhost\tmp_storage] stopped
2023-10-27 15:20:15.479 INFO 17960 --- [ionShutdownHook] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s).
2023-10-27 15:20:15.480 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async queue tasks
2023-10-27 15:20:15.481 INFO 17960 --- [ionShutdownHook] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger'
2023-10-27 15:20:15.481 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopping async topic tasks
2023-10-27 15:20:15.484 INFO 17960 --- [MQ ShutdownHook] o.a.activemq.store.kahadb.KahaDBStore : Stopped KahaDB