按并发任务数量(非基于时间)限制异步单声道

问题描述 投票:1回答:1

假设我有一个方法,它接受一个参数并返回异步完成的Mono<Integer>。例如:

Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);

Mono<Integer> fetch(String a) {
  return Mono.create(em -> {
    scheduledExecutorService.schedule(() -> em.next(a + " result"), 
      10 + random.nextInt(50), TimeUnit.MILLISECONDS);
  });
}

假设我有一个Flux<String>,我可以提供上面的fetch方法,它可以有很多元素。

有没有办法可以确保方法并行调用,但是将并发调用的数量限制为预定义的数量?

例如。在上面的例子中,我有16个可用的线程 - 所以我总是从这个角度保留12个备用。

java project-reactor
1个回答
2
投票

假设您使用flux.flatMap(this::fetch),那么您可以通过调用flux.flatMap(this::fetch, 4)来设置flatMap并发。

此外,您的代码有两个编译错误:

  1. fetch Mono<Integer>的返回类型与您给予接收器的项目类型(a + " result")不匹配。我认为你的意思是Mono<String>
  2. MonoSink没有.next方法。我认为你的意思是.success

鉴于所有这些,这是一个例子:

    private Flux<String> fetchAll() {
        return Flux.range(0, 50)
                .map(i -> Integer.toString(i))
                .flatMap(this::fetch, 4);

    }

    private Mono<String> fetch(String a) {
        return Mono.create(em ->
                scheduledExecutorService.schedule(() -> em.success(a + " result"),
                        10 + random.nextInt(50), TimeUnit.MILLISECONDS)
        );
    }

© www.soinside.com 2019 - 2024. All rights reserved.