[我们正在使用带有Spring WebFlux的Reactive Spring Data Repository,我对SubscribeOn的理解是,它决定在SubscribeOn将在流中执行操作之前,运算符将在哪个ThreadPool上确定,而PublishOn确定将在其上执行订阅的实际ThreadPool。但是,即使在使用PublishOn和SubscribeOn的以下代码中,代码也不会在主线程上执行,而是回落到Cluster-nio-worker-1。
System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
personRepository.findAll().log()
.map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
.subscribeOn(Schedulers.immediate())
.publishOn(Schedulers.immediate())
.subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
excp -> excp.printStackTrace(),
() -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1
还有Thread [cluster-nio-worker-1,5,main]是什么意思?为什么这些方法调用未使用主线程执行。
subscribeOn方法使发布者使用给定的线程池发布值。流水线中可能有N个subscribeOn
方法。最简单的将生效。 personRepository.findAll().log()
是包装器,并返回通量。因此,如果它在内部使用任何调度程序,则不能使用subscribeOn对其进行更改。例如,interval
方法使用并行,而我不能将其更改为boundedElastic,如下所示。
Flux.interval(Duration.ofSeconds(1))
.subscribeOn(Schedulers.boundedElastic())
Schedulers.immediate
仅将管道执行保持在同一线程中。它不是主要的,在您的情况下,它将是cluster-nio-worker
线程池。