同时运行阻塞操作

问题描述 投票:0回答:1

我遇到了 quarkus 应用程序的问题,我们从 RestEasy 端点进入,并希望在下游进行一组阻塞调用,以加快我们希望同时运行这些阻塞任务的执行速度。目前解决方案如下所示:

    public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
        // Convert the list of TradeRequest to a Multi
        return Multi.createFrom().iterable(sendTrades)
                // Use runSubscriptionOn to specify the executor for subscription handling
                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
                // Map each TradeRequest to a Uni<Trade> by validating the trade asynchronously
                .onItem().transformToUniAndMerge(tradeRequest ->
                        Uni.createFrom().item(() -> validateTrade(tradeRequest))
                )
                // Collect the results into a list
                .collect().asList()
                .await().atMost(Duration.ofMinutes(2));
    }

我们看到的是每个

validateTrade(tradeRequest)
仍然是串行执行器,我尝试过使用不同的执行器,例如
Infrastructure.getDefaultWorkerPool()
或:

    @Inject
    @ManagedExecutorConfig(maxAsync = 5, maxQueued = 5)
    @NamedInstance("tradeManagedExecutor")
    ManagedExecutor managedExecutor;

如果我能在这方面获得任何帮助,我将不胜感激,也许我只是不理解 Mutiny API 或者它与 Quarkus 的集成。

需要注意的是,

validateTrade(tradeRequest)
还需要使用应用程序上下文中的JWT,因此我们要求将此上下文传播到工作线程。

java quarkus resteasy mutiny
1个回答
0
投票

因为你的执行路径是串行的 (对列表的元素进行一一处理)

.onItem().transformToUniAndMerge(tradeRequest ->
Uni.createFrom().item(() -> validateTrade(tradeRequest))

也许使用并行流作为替代方案

sendTrades.parallelStream().forEach()
© www.soinside.com 2019 - 2024. All rights reserved.