假设我有一个方法,它接受一个参数并返回异步完成的Mono<Integer>
。例如:
Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
Mono<Integer> fetch(String a) {
return Mono.create(em -> {
scheduledExecutorService.schedule(() -> em.next(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS);
});
}
假设我有一个Flux<String>
,我可以提供上面的fetch
方法,它可以有很多元素。
有没有办法可以确保方法并行调用,但是将并发调用的数量限制为预定义的数量?
例如。在上面的例子中,我有16个可用的线程 - 所以我总是从这个角度保留12个备用。
假设您使用flux.flatMap(this::fetch)
,那么您可以通过调用flux.flatMap(this::fetch, 4)
来设置flatMap并发。
此外,您的代码有两个编译错误:
Mono<Integer>
的返回类型与您给予接收器的项目类型(a + " result"
)不匹配。我认为你的意思是Mono<String>
.next
方法。我认为你的意思是.success
鉴于所有这些,这是一个例子:
private Flux<String> fetchAll() {
return Flux.range(0, 50)
.map(i -> Integer.toString(i))
.flatMap(this::fetch, 4);
}
private Mono<String> fetch(String a) {
return Mono.create(em ->
scheduledExecutorService.schedule(() -> em.success(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS)
);
}