Project Reactor 中的默认调度程序是什么?

问题描述 投票:0回答:2

Project Reactor 中是否有“默认”调度程序?哪一个? “默认”是指当链没有调用

subscribeOn()
publishOn()
时使用的默认值。

spring-webflux project-reactor reactor
2个回答
4
投票

默认情况下,数据生产过程从发起订阅的

Thread
开始。处理时间的运算符(例如
Mono.delay
)将默认在
Schedulers.parallel()
调度程序上运行。

大多数反应式库(Reactive Redis、Mongo 等)都会使用

parallel
作为默认调度程序。

例如 Spring WebFlux,通常使用 Reactor Netty 作为默认嵌入式服务器,并会在

Schedulers.parallel()
上发起订阅。


0
投票

总结

Reactor 项目中没有类似“默认调度程序”的东西。这取决于许多因素,除非您仔细指定调度程序,否则您不应该使代码逻辑依赖于任何假设。

详细答案及来源

(我的答案或多或少与@Alex的相同,但我试图根据一些权威来源来获得它。)

Reactor 文档这么说

Reactor...可以被认为是并发无关的。也就是说,它不强制执行并发模型。相反,它让你(开发人员)来指挥。但是,这并不妨碍该库帮助您处理并发性。

获得

Flux
Mono
并不一定意味着它运行在专用线程中。相反,大多数运算符继续在前一个运算符执行的线程中工作。
除非指定,最顶层的运算符(源)本身在进行
subscribe()
调用的线程上运行。

因此,无需任何干预并使用“标准”运算符,例如

map()
flatMap()
filter()
,
最后的
subscribe()
在调用线程上运行。

某些操作员默认使用

Schedulers
中的特定调度程序(并且通常允许您选择提供不同的调度程序)。例如,调用
Flux.interval(Duration.ofMillis(300))
工厂方法会生成每 300 毫秒滴答一次的
Flux<Long>
。默认情况下,此功能由
Schedulers.parallel()
启用。

因此,只需调用 some 运算符即可更改默认线程。该方法的文档中总是提到它。例如。对于 Flux.interval(Duration):

Schedulers.parallel()
调度程序上运行。

但是如果您愿意,您可以更改它:

以下行将 Scheduler 更改为类似于 Schedulers.single() 的新实例:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

似乎每个在不同调度程序上运行的

Mono
Flux
方法都有一个带有额外
Scheduler
参数的重载替代方案。

Spring WebFlux 文档告诉我们:

您应该在使用 Spring WebFlux 运行的服务器上看到哪些线程?

在“普通”Spring WebFlux 服务器上(例如,没有数据访问或其他可选依赖项),您可以预期服务器有一个线程,其他几个线程用于请求处理(通常与 CPU 核心的数量一样多)。然而,Servlet 容器可能会以更多线程启动(例如,Tomcat 上有 10 个线程),以支持 servlet(阻塞)I/O 和 servlet 3.1(非阻塞)I/O 使用。

反应式 WebClient 以事件循环方式运行。因此,您可以看到与之相关的少量固定数量的处理线程(例如,带有 Reactor Netty 连接器的reactor-http-nio-)。但是,如果客户端和服务器都使用 Reactor Netty,则默认情况下两者共享事件循环资源。

Reactor 和 RxJava 提供了线程池抽象,称为调度程序,与用于将处理切换到不同线程池的publishOn 运算符一起使用。调度程序的名称暗示了特定的并发策略——例如,“并行”(用于具有有限数量线程的 CPU 密集型工作)或“弹性”(用于具有大量线程的 I/O 密集型工作)。如果您看到这样的线程,则意味着某些代码正在使用特定的线程池调度程序策略。

数据访问库和其他第三方依赖项也可以创建和使用自己的线程。

© www.soinside.com 2019 - 2024. All rights reserved.