执行程序通道中的线程安全

问题描述 投票:0回答:1

我有一个消息生成器,它每秒产生大约15条消息

使用者是一个Spring集成项目,它从Message Queue进行消耗并进行大量处理。我已经使用Executor通道并行处理消息,然后流程通过一些常见的处理程序类。

请在下面的代码段中找到-

  1. [baseEventFlow()-我们从EMS队列接收消息并将其发送到路由器]
  2. router()-基于消息的ID”,为一个特定的ExecutorChannel实例配置一个单线程Executor。每个ExecutorChannel都将成为仅具有单个线程的专用执行程序。
  3. skwDefaultChannel(), gjsucaDefaultChannel(), rpaDefaultChannel()-所有ExecutorChannel Bean都以@BridgeTo标记为同一通道,该通道开始该公共流程。
  4. [uaEventFlow()-在这里每条消息都会得到处理]
  5. @Bean
    public IntegrationFlow baseEventFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)
                .route(router()).get();
    }
    
    public AbstractMessageRouter router() {
        return new AbstractMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                if (message.getPayload().toString().contains("\"id\":\"RPA")) {
                    return Collections.singletonList(skwDefaultChannel());
                }else if (message.getPayload().toString().contains("\"id\":\"ASH")) {
                    return Collections.singletonList(rpaDefaultChannel());
                } else if (message.getPayload().toString().contains("\"id\":\"GJS")
                        || message.getPayload().toString().contains("\"id\":\"UCA")) {
                    return Collections.singletonList(gjsucaDefaultChannel());
                } else {
                    return Collections.singletonList(new NullChannel());
                }
            }
        };
    }
    
    @Bean
    @BridgeTo("uaDefaultChannel")
    public MessageChannel skwDefaultChannel() {
        return MessageChannels.executor(SKW_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }
    
    @Bean
    @BridgeTo("uaDefaultChannel")
    public MessageChannel gjsucaDefaultChannel() {
        return MessageChannels.executor(GJS_UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }
    
    @Bean
    @BridgeTo("uaDefaultChannel")
    public MessageChannel rpaDefaultChannel() {
        return MessageChannels.executor(RPA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }
    
    @Bean
    public IntegrationFlow uaEventFlow() {
        return IntegrationFlows.from("uaDefaultChannel")
                 .wireTap(UA_WIRE_TAP_CHNL)
                 .transform(eventHandler, "parseEvent")
                 .handle(uaImpl, "process").get();
    }
    

我担心的是,在uaEVentFlow()中,常见的转换和处理程序方法不是线程安全的,可能会引起问题。如何确保每次调用消息时都注入新的转换器和处理程序?我应该将转换器和处理程序bean的范围更改为原型吗?

我有一个消息生成器,它每秒产生约15条消息。消费者是一个Spring集成项目,它从Message Queue中进行消耗并进行大量处理。我用过...

spring-integration spring-integration-dsl
1个回答
0
投票

而不是桥接到通用流,您应该将.transform().handle()移动到每个上游流并添加

© www.soinside.com 2019 - 2024. All rights reserved.