Project Reactor 中是否有“默认”调度程序?哪一个? “默认”是指当链没有调用
subscribeOn()
或 publishOn()
时使用的默认值。
默认情况下,数据生产过程从发起订阅的
Thread
开始。处理时间的运算符(例如 Mono.delay
)将默认在 Schedulers.parallel()
调度程序上运行。
大多数反应式库(Reactive Redis、Mongo 等)都会使用
parallel
作为默认调度程序。
例如 Spring WebFlux,通常使用 Reactor Netty 作为默认嵌入式服务器,并会在
Schedulers.parallel()
上发起订阅。
Reactor 项目中没有类似“默认调度程序”的东西。这取决于许多因素,除非您仔细指定调度程序,否则您不应该使代码逻辑依赖于任何假设。
(我的答案或多或少与@Alex的相同,但我试图根据一些权威来源来获得它。)
Reactor 文档这么说
Reactor...可以被认为是并发无关的。也就是说,它不强制执行并发模型。相反,它让你(开发人员)来指挥。但是,这并不妨碍该库帮助您处理并发性。
获得
或Flux
并不一定意味着它运行在专用线程中。相反,大多数运算符继续在前一个运算符执行的线程中工作。Mono
除非指定,最顶层的运算符(源)本身在进行调用的线程上运行。subscribe()
因此,无需任何干预并使用“标准”运算符,例如
map()
、flatMap()
或 filter()
,subscribe()
在调用线程上运行。
某些操作员默认使用
中的特定调度程序(并且通常允许您选择提供不同的调度程序)。例如,调用Schedulers
工厂方法会生成每 300 毫秒滴答一次的Flux.interval(Duration.ofMillis(300))
。默认情况下,此功能由Flux<Long>
启用。Schedulers.parallel()
因此,只需调用 some 运算符即可更改默认线程。该方法的文档中总是提到它。例如。对于 Flux.interval(Duration):
在
调度程序上运行。Schedulers.parallel()
但是如果您愿意,您可以更改它:
以下行将 Scheduler 更改为类似于 Schedulers.single() 的新实例:
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
似乎每个在不同调度程序上运行的
Mono
和 Flux
方法都有一个带有额外 Scheduler
参数的重载替代方案。
Spring WebFlux 文档告诉我们:
您应该在使用 Spring WebFlux 运行的服务器上看到哪些线程?
在“普通”Spring WebFlux 服务器上(例如,没有数据访问或其他可选依赖项),您可以预期服务器有一个线程,其他几个线程用于请求处理(通常与 CPU 核心的数量一样多)。然而,Servlet 容器可能会以更多线程启动(例如,Tomcat 上有 10 个线程),以支持 servlet(阻塞)I/O 和 servlet 3.1(非阻塞)I/O 使用。
反应式 WebClient 以事件循环方式运行。因此,您可以看到与之相关的少量固定数量的处理线程(例如,带有 Reactor Netty 连接器的reactor-http-nio-)。但是,如果客户端和服务器都使用 Reactor Netty,则默认情况下两者共享事件循环资源。
Reactor 和 RxJava 提供了线程池抽象,称为调度程序,与用于将处理切换到不同线程池的publishOn 运算符一起使用。调度程序的名称暗示了特定的并发策略——例如,“并行”(用于具有有限数量线程的 CPU 密集型工作)或“弹性”(用于具有大量线程的 I/O 密集型工作)。如果您看到这样的线程,则意味着某些代码正在使用特定的线程池调度程序策略。
数据访问库和其他第三方依赖项也可以创建和使用自己的线程。