Reactor Flux 保持阻塞,直到收到服务器响应

问题描述 投票:0回答:1

我创建了一个 Flux,它在订阅时发出 API 响应。 API 需要几秒钟的时间来响应。我还将流程切换到具有 2 个线程的调度程序。

private Scheduler sc = Schedulers.newBoundedElastic(2, Integer.MAX_VALUE, "newwww-sch");

Flux.just(i)
        .flatMap(x -> {
          try {
            System.out.println(Thread.currentThread().getName() + " Do Request " + i);
            return Mono.just(spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest));
          } catch (ApiException e) {
            throw new RuntimeException(e);
          }
        })
    .subscribeOn(sc)
    .subscribe(s -> System.out.println(Thread.currentThread().getName() + " Got Response " + i));

当我使用并行线程触发流程时,我注意到它提交了 2 个请求,然后什么都不做,直到它从服务器获取至少一个请求的响应。

newwww-sch-1 Do Request 11
newwww-sch-2 Do Request 12
newwww-sch-2 Got Response 12
newwww-sch-1 Got Response 11
newwww-sch-2 Do Request 13
newwww-sch-1 Do Request 14
newwww-sch-1 Got Response 14
newwww-sch-2 Got Response 13

此外,提交请求的线程与处理请求响应的线程是同一线程。这是预期的吗?我的期望是,只要调度程序线程可用或线程正在等待服务器的响应,它就会提交请求。

spring-boot mono spring-webflux project-reactor flux
1个回答
0
投票

你是对的,只要你有调度程序线程,它就会提交,但是你的调度程序有两个线程,这就是为什么你只有两个并行的原因。

您使用

.subscribeOn(sc)
指定了订阅线程,它用于提交,并且由于您没有为内部
flatMap
订阅指定任何其他内容,因此它会重用现有线程。如果您想将它们分开,并增加用于实际请求的线程数,请在另一个调度程序上订阅它,例如:

.flatMap(x -> 
      Mono.fromCallable {
        System.out.println(Thread.currentThread().getName() + " Do Request " + i);
        return spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest);
      }.subscribeOn(Schedulers.boundedElastic())
)

顺便说一句,问题之一是您的

flatMap
正在阻塞,因为它等待
spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest)
执行。

© www.soinside.com 2019 - 2024. All rights reserved.