我有一个消息生成器,它每秒产生大约15条消息
使用者是一个Spring集成项目,它从Message Queue进行消耗并进行大量处理。我已经使用Executor通道并行处理消息,然后流程通过一些常见的处理程序类。
请在下面的代码段中找到-
baseEventFlow()
-我们从EMS队列接收消息并将其发送到路由器]router()
-基于消息的ID”,为一个特定的ExecutorChannel实例配置一个单线程Executor。每个ExecutorChannel都将成为仅具有单个线程的专用执行程序。skwDefaultChannel(), gjsucaDefaultChannel(), rpaDefaultChannel()
-所有ExecutorChannel Bean都以@BridgeTo标记为同一通道,该通道开始该公共流程。 uaEventFlow()
-在这里每条消息都会得到处理]@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)
.route(router()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(skwDefaultChannel());
}else if (message.getPayload().toString().contains("\"id\":\"ASH")) {
return Collections.singletonList(rpaDefaultChannel());
} else if (message.getPayload().toString().contains("\"id\":\"GJS")
|| message.getPayload().toString().contains("\"id\":\"UCA")) {
return Collections.singletonList(gjsucaDefaultChannel());
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel skwDefaultChannel() {
return MessageChannels.executor(SKW_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel gjsucaDefaultChannel() {
return MessageChannels.executor(GJS_UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel rpaDefaultChannel() {
return MessageChannels.executor(RPA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel")
.wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
.handle(uaImpl, "process").get();
}
我担心的是,在uaEVentFlow()中,常见的转换和处理程序方法不是线程安全的,可能会引起问题。如何确保每次调用消息时都注入新的转换器和处理程序?我应该将转换器和处理程序bean的范围更改为原型吗?
我有一个消息生成器,它每秒产生约15条消息。消费者是一个Spring集成项目,它从Message Queue中进行消耗并进行大量处理。我用过...
而不是桥接到通用流,您应该将.transform()
和.handle()
移动到每个上游流并添加