为了使这些消费者产生任何影响并产生任何输出到标准输出,您不能按原样运行此代码:
@Test
void test() {
WebClient.builder().build()
.get()
.uri("https://httpbin.org/status/404")
.retrieve()
.bodyToMono(String.class)
.subscribe(
s -> System.out.println("Success: " + s),
t -> System.out.println("Failure: " + t.getMessage())
);
}
我的第一个工作解决方案是添加
sleep()
调用。就在那时我意识到我的代码(那些 println 的)是在守护线程上运行的
我希望 JVM 实际执行这些打印语句之一,而不是立即调用它退出,而不需要
block()
ing。为了实现这一点,我需要让 Reactor 分配常规线程,而不是守护进程
我尝试通过附加来做到这一点
.subscribeOn(Schedulers.newParallel("my-scheduler", 5, false))
到方法链(注意
false
,应该表示“非守护进程”)。遗憾的是,Reactor 仍然在其默认守护进程上运行代码(没有日志)
我尝试了工厂提供的其他一些
Scheduler
(包括一些“有界弹性”Scheduler
,无论这意味着什么)。没有变化
如何强制 Reactor 在非守护线程上运行
subscribe()
代码?
免责声明:我不太确定您想要实现什么目标。 如果我理解得很好,那么您正在尝试更好地理解 Reactor 执行模型以及如何控制它。因此,这篇文章不会是直接“这样做”的答案。相反,我将尝试提供有关 Reactor 控制流/线程模型以及如何使用它的见解。
首先,我们来谈谈汇编与执行。当您使用 Reactor 方法时,大多数时候,您会通过传入消息流“组装”操作管道。管道执行需要调用 subscribe()
或
block()
。每个中间操作都是一个隐藏的订阅,它接收上游流消息/值,对其进行处理,然后将结果发送到下游订阅(链中的下一个操作)。
每个操作/订阅都可以指定它希望在其中运行的scheduler(即线程池)。
例如, flatMap
默认指定
parallel scheduler
,以允许同时处理多个值。这意味着管道运行可以涉及来自多个上下文的多个线程,并且所涉及的线程不会被管道“占用”。
下一节将详细介绍。
订阅
subscribe()
时,Reactor 会在调用线程上触发执行。但是,重要的是:它仍然
异步运行,并且它不会阻塞正在运行/计划的线程。这意味着当你做这样的事情时:
public static void main() {
Flux.just(1, 2, 3, 4).subscribe(i -> System.out.println("Next value: "+i));
System.out.println("Hello from main method");
}
所有打印都将从主线程完成,但是您仍然无法确定按什么顺序,因为 subscribe() 将在消耗通量之前返回。 Flux 将在主线程上执行,但仅当它不忙于其他任务时才执行。这意味着在上面的示例中,主线程可以很好地在流程执行完成之前退出 main 方法。
如果您手动订阅 Flux,并且希望将其完成与程序中的特定点同步/连接,则必须自己添加同步屏障。如果我们使用上面的示例,并且想要确保 main 方法在 Flux 结束之前不会退出,我们要么必须使用
block()
(这可能会导致问题,特别是默认情况下使用阻塞不兼容线程池的 WebClient 代码) ,或者自己添加一个障碍,比如:
public static void main() {
var barrier = new CountDownLatch(1);
Flux.just(1, 2, 3, 4).subscribe(
i -> System.out.println("Next value: "+i),
error -> barrier.countDown(),
barrier::countDown
);
System.out.println("Hello from main method");
// Wait for flux completion before exiting method
barrier.await();
}
关于订阅更深入的解释,可以查看另一个答案:Spring Reactor的线程模型是什么注意
:在链接的答案中,您还会找到另一种(低效)方法来等待通量完成。 管理执行
例如使用Webflux时,Flux的执行或取消绑定到Netty异步IO,以正确完成或取消HTTP响应。
进行单元测试时,您应该使用
StepVerifier
或
WebTestClient
,它们依次运行您的管道并确保其完成。