我对 Spring Injection 完全陌生。我正在寻找以下逻辑:
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlow.from(Constants.INBOUND_CHANNEL)
.bridge(e -> e.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(5).taskExecutor(taskExecutor1())))
.log()
.aggregate(a -> a.correlationStrategy(m -> m != null).releaseStrategy(g -> g.size() == 2)
.expireGroupsUponCompletion(true).expireGroupsUponTimeout(true).async(true))
.log().handle(a -> {
//Add code here to send the data to Http end point which takes around 5 secs.
System.out.println("Thread Sleeping on ThreadId : " + Thread.currentThread().getId());
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}, e -> e.async(true)).get();
}
@Bean
TaskExecutor taskExecutor1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
return taskExecutor;
}
@Bean(name = Constants.INBOUND_CHANNEL)
public QueueChannel inboundFilePollingChannel() {
return new QueueChannel(100);
}
上面的代码运行良好。但总体逻辑看起来是按顺序发生的。在聚合组被释放之前,任务执行器的其他线程不会挑选消息进行处理,也不会将消息发送到 Http Endpoint。
关于方法有什么提示吗?
首先,如果服务激活器未返回
async(true)
或响应式流 CompletableFuture
,则 Publisher
没有帮助。
那么你就会遇到这个问题:
correlationStrategy(m -> m != null)
,对于您发送到此聚合器的所有消息,它总是返回 true
。 AggregatingMessageHandler
的进一步逻辑是这样的:
UUID groupIdUuid = UUIDConverter.getUUID(correlationKey);
Lock lock = this.lockRegistry.obtain(groupIdUuid.toString());
在我们处理完当前消息之前,它会被锁定。因此,即使您从不同的线程生成此聚合器,您仍然可以按顺序和阻塞的方式进行聚合。只是因为您根据关联策略对所有消息使用相同的
Lock
实例。
还有一个问题:看起来不像是并行调用 HTTP 服务。您需要考虑在
ExecutorChannel
和 aggregate()
之间添加 handle()
。这样,聚合器的结果(两条消息)将生成到另一个线程,并且当前线程将被释放以处理下一条消息以进行聚合。
我们没有开箱即用的组件来按照消息到达的顺序发出大量消息:我们只有基于相关键的逻辑:
AggregatingMessageHandler
、ResequencingMessageHandler
、CorrelatingMessageBarrier
和 FluxAggregatorMessageHandler
。
我认为您仍然可以依赖聚合器中使用通用相关键的顺序释放,但是您不需要在轮询器上使用
taskExecutor
,因为它没有任何效果,而是在之后使用 channel(c -> c.executor())
聚合器。这样,您将看到对 HTTP 服务的并行调用,每个调用有 2 条消息。