SubscribeOn或PublishOn不适用于ReactiveCassandraCrudRepository

问题描述 投票:0回答:1

[我们正在使用带有Spring WebFlux的Reactive Spring Data Repository,我对SubscribeOn的理解是,它决定在SubscribeOn将在流中执行操作之前,运算符将在哪个ThreadPool上确定,而PublishOn确定将在其上执行订阅的实际ThreadPool。但是,即使在使用PublishOn和SubscribeOn的以下代码中,代码也不会在主线程上执行,而是回落到Cluster-nio-worker-1。

    System.out.println("Current Thread :- "+Thread.currentThread().getName()); //Current Thread :- main
    personRepository.findAll().log()
            .map(document -> mapDocumentToSomethingElse(document)) //Current thread cluster-nio-worker-1
            .subscribeOn(Schedulers.immediate())
            .publishOn(Schedulers.immediate())
            .subscribe(trackingevent -> System.out.println("Got Item "+item +" inside thread "+Thread.currentThread()), //Thread[cluster-nio-worker-1,5,main]
                    excp -> excp.printStackTrace(),
                    () -> System.out.println("Completed processing Thread:- "+Thread.currentThread().getName())); //cluster-nio-worker-1

还有Thread [cluster-nio-worker-1,5,main]是什么意思?为什么这些方法调用未使用主线程执行。

cassandra reactive-programming spring-webflux project-reactor spring-data-cassandra
1个回答
0
投票

subscribeOn方法使发布者使用给定的线程池发布值。流水线中可能有N个subscribeOn方法。最简单的将生效。 personRepository.findAll().log()是包装器,并返回通量。因此,如果它在内部使用任何调度程序,则不能使用subscribeOn对其进行更改。例如,interval方法使用并行,而我不能将其更改为boundedElastic,如下所示。

    Flux.interval(Duration.ofSeconds(1))
            .subscribeOn(Schedulers.boundedElastic())

Schedulers.immediate仅将管道执行保持在同一线程中。它不是主要的,在您的情况下,它将是cluster-nio-worker线程池。

© www.soinside.com 2019 - 2024. All rights reserved.