我遇到了 quarkus 应用程序的问题,我们从 RestEasy 端点进入,并希望在下游进行一组阻塞调用,以加快我们希望同时运行这些阻塞任务的执行速度。目前解决方案如下所示:
public List<Trade> validateTrades(final List<TradeRequest<TradeRequestDetails>> sendTrades) {
// Convert the list of TradeRequest to a Multi
return Multi.createFrom().iterable(sendTrades)
// Use runSubscriptionOn to specify the executor for subscription handling
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
// Map each TradeRequest to a Uni<Trade> by validating the trade asynchronously
.onItem().transformToUniAndMerge(tradeRequest ->
Uni.createFrom().item(() -> validateTrade(tradeRequest))
)
// Collect the results into a list
.collect().asList()
.await().atMost(Duration.ofMinutes(2));
}
我们看到的是每个
validateTrade(tradeRequest)
仍然是串行执行器,我尝试过使用不同的执行器,例如Infrastructure.getDefaultWorkerPool()
或:
@Inject
@ManagedExecutorConfig(maxAsync = 5, maxQueued = 5)
@NamedInstance("tradeManagedExecutor")
ManagedExecutor managedExecutor;
如果我能在这方面获得任何帮助,我将不胜感激,也许我只是不理解 Mutiny API 或者它与 Quarkus 的集成。
需要注意的是,
validateTrade(tradeRequest)
还需要使用应用程序上下文中的JWT,因此我们要求将此上下文传播到工作线程。
因为你的执行路径是串行的 (对列表的元素进行一一处理)
.onItem().transformToUniAndMerge(tradeRequest ->
Uni.createFrom().item(() -> validateTrade(tradeRequest))
也许使用并行流作为替代方案
sendTrades.parallelStream().forEach()