Spring Integration DSL:仅根据消息计数应用聚合器并并行发送到 Http 端点

问题描述 投票:0回答:1

我对 Spring Injection 完全陌生。我正在寻找以下逻辑:

  1. 消息被发布到队列通道(这可以更改)。我只想节流
  2. 使用 Spring 注入聚合队列中的 2 条消息(不需要关联)并将其作为单个消息发送到 Http 端点。 http 端点有时可能很慢。因此我想要 10 个并行线程并行处理消息。每个线程从队列中选取 2 条消息并发送给 Http 端点。

@Bean
public IntegrationFlow myFlow() {
       return IntegrationFlow.from(Constants.INBOUND_CHANNEL)
        .bridge(e ->         e.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(5).taskExecutor(taskExecutor1())))
        .log()
        .aggregate(a -> a.correlationStrategy(m -> m != null).releaseStrategy(g -> g.size() == 2)
                .expireGroupsUponCompletion(true).expireGroupsUponTimeout(true).async(true))
        .log().handle(a -> {
            //Add code here to send the data to Http end point which takes around 5 secs.
            System.out.println("Thread Sleeping on ThreadId : " + Thread.currentThread().getId());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }, e -> e.async(true)).get();
}

    @Bean
    TaskExecutor taskExecutor1() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        return taskExecutor;
    }

  @Bean(name = Constants.INBOUND_CHANNEL)
  public QueueChannel inboundFilePollingChannel() {
      return new QueueChannel(100);
  }


上面的代码运行良好。但总体逻辑看起来是按顺序发生的。在聚合组被释放之前,任务执行器的其他线程不会挑选消息进行处理,也不会将消息发送到 Http Endpoint。

关于方法有什么提示吗?

java spring-integration spring-integration-dsl
1个回答
0
投票

首先,如果服务激活器未返回

async(true)
或响应式流
CompletableFuture
,则
Publisher
没有帮助。

那么你就会遇到这个问题:

correlationStrategy(m -> m != null)
,对于您发送到此聚合器的所有消息,它总是返回
true
AggregatingMessageHandler
的进一步逻辑是这样的:

    UUID groupIdUuid = UUIDConverter.getUUID(correlationKey);
    Lock lock = this.lockRegistry.obtain(groupIdUuid.toString());

在我们处理完当前消息之前,它会被锁定。因此,即使您从不同的线程生成此聚合器,您仍然可以按顺序和阻塞的方式进行聚合。只是因为您根据关联策略对所有消息使用相同的

Lock
实例。

还有一个问题:看起来不像是并行调用 HTTP 服务。您需要考虑在

ExecutorChannel
aggregate()
之间添加
handle()
。这样,聚合器的结果(两条消息)将生成到另一个线程,并且当前线程将被释放以处理下一条消息以进行聚合。

我们没有开箱即用的组件来按照消息到达的顺序发出大量消息:我们只有基于相关键的逻辑:

AggregatingMessageHandler
ResequencingMessageHandler
CorrelatingMessageBarrier
 FluxAggregatorMessageHandler

我认为您仍然可以依赖聚合器中使用通用相关键的顺序释放,但是您不需要在轮询器上使用

taskExecutor
,因为它没有任何效果,而是在之后使用
channel(c -> c.executor())
聚合器。这样,您将看到对 HTTP 服务的并行调用,每个调用有 2 条消息。

© www.soinside.com 2019 - 2024. All rights reserved.