我对Spring Integration Java DSL比较陌生。我想实现一个非常简单的场景,当前由Spring Boot应用程序完成
目前我的逻辑很简单
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(1000)
.maxMessagesPerPoll(10).get();
}
@Bean
public TaskExecutor taskExecutor() {
int poolSize = 10;
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
LOGGER.debug("...... creating ThreadPool of size {}.", poolSize);
executor.setThreadNamePrefix("XmlTaskExecutor_thread_");
executor.setMaxPoolSize(100);
executor.setCorePoolSize(poolSize);
executor.setQueueCapacity(10);
return executor;
}
@Bean
public IntegrationFlow jmsMessageDrivenFlowWithContainer() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(
Jms.container(this.jmsConnectionFactory, recordDestinationQueue)
.taskExecutor(this.taskExecutor())))
.transform(xsltTransformer, "doTransform")
.channel(this.handleChannel())
.get();
}
@Bean
@ServiceActivator(inputChannel = "handleChannel")
public RecordSenderHandler recordSenderHandler() {
return new RecordSenderHandler();
}
任何人都可以帮助我采用哪种最佳方法来处理我的案件。谢谢
想使用taskExecutor为多线程方法应用可配置的池大小
这是错误的方法;在将消息传递到另一个线程时,您可能会丢失消息。
要增加并发性,您需要增加侦听器容器上的并发性。
您还将混合DSL和Java配置。
使用
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(...))
.transform(...)
...
代替。
编辑
不要在此处的容器规范上使用.get()
,它将无法正确初始化。框架需要创建bean。
此外,您仍然需要在容器中添加并发。
向适配器添加errorChannel
以处理错误。
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination)
.concurrentConsumers(5))
.errorChannel(someErrorChannel))
.transform(xsltTransformer, "doTransform")
...