带有JMS侦听器和任务执行器和断路器的Spring Integration Java DSL

问题描述 投票:0回答:1

我对Spring Integration Java DSL比较陌生。我想实现一个非常简单的场景,当前由Spring Boot应用程序完成

  1. 从JMS队列(即ActiveMQ)中检索消息。我想使用taskExecutor为多线程方法应用可配置的池大小]
  2. 使用XsltTransformer将有效负载转换为xml格式。万一变压器出现错误,我想停止链条(不需要继续执行步骤3)
  3. 调用其他通道以通过休息将xml有效负载发送到我的核心应用程序。在这一部分中,我想应用断路器,以防通过rest调用我的核心应用程序时出现任何错误

目前我的逻辑很简单

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
    return Pollers.fixedRate(1000)
            .maxMessagesPerPoll(10).get();
}

 @Bean
public TaskExecutor taskExecutor() {
    int poolSize = 10;
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    LOGGER.debug("...... creating ThreadPool of size {}.", poolSize);
    executor.setThreadNamePrefix("XmlTaskExecutor_thread_");
    executor.setMaxPoolSize(100);
    executor.setCorePoolSize(poolSize);
    executor.setQueueCapacity(10);
    return executor;
}

@Bean
public IntegrationFlow jmsMessageDrivenFlowWithContainer() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(
                    Jms.container(this.jmsConnectionFactory, recordDestinationQueue)
                            .taskExecutor(this.taskExecutor())))
                    .transform(xsltTransformer, "doTransform")
                    .channel(this.handleChannel())
                    .get();
}

@Bean
@ServiceActivator(inputChannel = "handleChannel")
public RecordSenderHandler recordSenderHandler() {
    return new RecordSenderHandler();
}

任何人都可以帮助我采用哪种最佳方法来处理我的案件。谢谢

spring-jms spring-integration-dsl
1个回答
0
投票

想使用taskExecutor为多线程方法应用可配置的池大小

这是错误的方法;在将消息传递到另一个线程时,您可能会丢失消息。

要增加并发性,您需要增加侦听器容器上的并发性。

您还将混合DSL和Java配置。

使用

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(...))
    .transform(...)
    ...

代替。

编辑

不要在此处的容器规范上使用.get(),它将无法正确初始化。框架需要创建bean。

此外,您仍然需要在容器中添加并发。

向适配器添加errorChannel以处理错误。

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination)
                    .concurrentConsumers(5))
                .errorChannel(someErrorChannel))
            .transform(xsltTransformer, "doTransform")
            ...

© www.soinside.com 2019 - 2024. All rights reserved.