subscribe()方法是否阻塞?

问题描述 投票:0回答:1

我从 Project Reactor 开始,我无法理解为什么这段代码无限地打印“Hello”并且从不返回

Disposable
对象。

Flux<Object> flux = Flux.generate(sink -> sink.next("Hello"));
Disposable disposable = flux.subscribe(System.out::println);
disposable.dispose();
System.out.println("This doesn't print");

我认为,当调用

subscribe()
方法时,它必须立即返回
Disposable
对象,如果我愿意,我可以使用该对象取消订阅。我知道这个
subscribe
方法中的代码在同一个线程中运行,如果我在
delayElements
调用之前替换
subscribe
方法,那么下面的代码将起作用,因为它在单独的守护线程中运行,所以可以解释为什么吗?它是否停止在
subscribe
方法并且不返回
Disposable
以及是否有任何方法可以通过调用
subscribe
方法来管理订阅?是否可以类比
delayElements
方法,在单独的线程中执行,并且调用
subscribe
方法的结果立即返回
Disposable

我找不到这个问题的具体答案。在我看到的所有示例中,要么是有限数据流,要么使用了delayElements方法。

java project-reactor publish-subscribe blocking
1个回答
0
投票

是的,

subscribe()
在这个特定场景中会阻塞。

Reactor 是并发无关的,这意味着默认情况下它不会对您强制执行任何线程/异步性,并在调用线程(在本例中为主线程)上执行管道。

您可以使用

subscribeOn
publishOn
显式更改此设置,也可以使用
delayElements
等某些运算符隐式更改。

Flux<Object> flux = Flux.generate(sink -> sink.next("Hello")).publishOn(Schedulers.parallel());
Disposable disposable = flux.subscribe(System.out::println);
disposable.dispose();
System.out.println("This doesn't print");
© www.soinside.com 2019 - 2024. All rights reserved.