我创建了一个 Flux,它在订阅时发出 API 响应。 API 需要几秒钟的时间来响应。我还将流程切换到具有 2 个线程的调度程序。
private Scheduler sc = Schedulers.newBoundedElastic(2, Integer.MAX_VALUE, "newwww-sch");
Flux.just(i)
.flatMap(x -> {
try {
System.out.println(Thread.currentThread().getName() + " Do Request " + i);
return Mono.just(spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest));
} catch (ApiException e) {
throw new RuntimeException(e);
}
})
.subscribeOn(sc)
.subscribe(s -> System.out.println(Thread.currentThread().getName() + " Got Response " + i));
当我使用并行线程触发流程时,我注意到它提交了 2 个请求,然后什么都不做,直到它从服务器获取至少一个请求的响应。
newwww-sch-1 Do Request 11
newwww-sch-2 Do Request 12
newwww-sch-2 Got Response 12
newwww-sch-1 Got Response 11
newwww-sch-2 Do Request 13
newwww-sch-1 Do Request 14
newwww-sch-1 Got Response 14
newwww-sch-2 Got Response 13
此外,提交请求的线程与处理请求响应的线程是同一线程。这是预期的吗?我的期望是,只要调度程序线程可用或线程正在等待服务器的响应,它就会提交请求。
你是对的,只要你有调度程序线程,它就会提交,但是你的调度程序有两个线程,这就是为什么你只有两个并行的原因。
您使用
.subscribeOn(sc)
指定了订阅线程,它用于提交,并且由于您没有为内部 flatMap
订阅指定任何其他内容,因此它会重用现有线程。如果您想将它们分开,并增加用于实际请求的线程数,请在另一个调度程序上订阅它,例如:
.flatMap(x ->
Mono.fromCallable {
System.out.println(Thread.currentThread().getName() + " Do Request " + i);
return spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest);
}.subscribeOn(Schedulers.boundedElastic())
)
顺便说一句,问题之一是您的
flatMap
正在阻塞,因为它等待 spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest)
执行。