如何强制 Reactor 在非守护线程上运行 subscribe() 代码?

问题描述 投票:0回答:1

这是这个问题的后续,而这又是这个问题

的后续

为了使这些消费者产生任何影响并产生任何输出到标准输出,您不能按原样运行此代码:

@Test
void test() {
    WebClient.builder().build()
        .get()
        .uri("https://httpbin.org/status/404")
        .retrieve()
        .bodyToMono(String.class)
        .subscribe(
                s -> System.out.println("Success: " + s),
                t -> System.out.println("Failure: " + t.getMessage())
        );
}

我的第一个工作解决方案是添加

sleep()
调用。就在那时我意识到我的代码(那些 println 的)是在守护线程上运行的

我希望 JVM 实际执行这些打印语句之一,而不是立即调用它退出,而不需要

block()
ing。为了实现这一点,我需要让 Reactor 分配常规线程,而不是守护进程

我尝试通过附加来做到这一点

.subscribeOn(Schedulers.newParallel("my-scheduler", 5, false))

到方法链(注意

false
,应该表示“非守护进程”)。遗憾的是,Reactor 仍然在其默认守护进程上运行代码(没有日志)

我尝试了工厂提供的其他一些

Scheduler
(包括一些“有界弹性”
Scheduler
,无论这意味着什么)。没有变化

如何强制 Reactor 在非守护线程上运行

subscribe()
代码?

java project-reactor
1个回答
0
投票

免责声明:我不太确定您想要实现什么目标。 如果我理解得很好,那么您正在尝试更好地理解 Reactor 执行模型以及如何控制它。因此,这篇文章不会是直接“这样做”的答案。相反,我将尝试提供有关 Reactor 控制流/线程模型以及如何使用它的见解

Reactor 生命周期:组装和执行

首先,我们来谈谈汇编与执行。当您使用 Reactor 方法时,大多数时候,您会通过传入消息流“组装”操作管道。管道执行需要调用 subscribe()

block()
每个中间操作都是一个隐藏的订阅,它接收上游流消息/值,对其进行处理,然后将结果发送到下游订阅(链中的下一个操作)。
每个操作/订阅都可以指定它希望在其中运行的

scheduler

(即线程池)。 例如, flatMap 默认指定

parallel scheduler
,以允许同时处理多个值。
这意味着管道运行可以涉及来自多个上下文的多个线程,并且所涉及的线程不会被管道“占用”。
下一节将详细介绍。

订阅

当调用任何形式的

subscribe()

时,Reactor 会在调用线程上触发执行。但是,重要的是:它仍然

异步运行
,并且它不会阻塞正在运行/计划的线程。这意味着当你做这样的事情时: public static void main() { Flux.just(1, 2, 3, 4).subscribe(i -> System.out.println("Next value: "+i)); System.out.println("Hello from main method"); }

所有打印都将从主线程完成,但是您仍然无法确定按什么顺序,因为 subscribe() 将在消耗通量之前返回。 Flux 将在主线程上执行,但仅当它不忙于其他任务时才执行。这意味着在上面的示例中,主线程可以很好地在流程执行完成之前退出 main 方法。

如果您手动订阅 Flux,并且希望将其完成与程序中的特定点同步/连接,则必须自己添加同步屏障。如果我们使用上面的示例,并且想要确保 main 方法在 Flux 结束之前不会退出,我们要么必须使用

block()

(这可能会导致问题,特别是默认情况下使用阻塞不兼容线程池的 WebClient 代码) ,或者自己添加一个障碍,比如:

public static void main() {
    var barrier = new CountDownLatch(1);
    Flux.just(1, 2, 3, 4).subscribe(
        i -> System.out.println("Next value: "+i),
        error -> barrier.countDown(),
        barrier::countDown
    );
    System.out.println("Hello from main method");
    
    // Wait for flux completion before exiting method
    barrier.await();
}

关于订阅更深入的解释,可以查看另一个答案:
Spring Reactor的线程模型是什么

注意

:在链接的答案中,您还会找到另一种(低效)方法来等待通量完成。 管理执行

大多数时候,您不想或不需要自己调用订阅。当使用任何 Spring 反应式技术(如 Webflux、Cloud Stream 等)时,您将发布者(flux 或 mono)返回给 Spring 组件,这将负责正确调度和运行您的管道。

例如使用Webflux时,Flux的执行或取消绑定到Netty异步IO,以正确完成或取消HTTP响应。

进行单元测试时,您应该使用

StepVerifier

WebTestClient
,它们依次运行您的管道并确保其完成。
    

© www.soinside.com 2019 - 2024. All rights reserved.