我从 Project Reactor 开始,我无法理解为什么这段代码无限地打印“Hello”并且从不返回
Disposable
对象。
Flux<Object> flux = Flux.generate(sink -> sink.next("Hello"));
Disposable disposable = flux.subscribe(System.out::println);
disposable.dispose();
System.out.println("This doesn't print");
我认为,当调用
subscribe()
方法时,它必须立即返回 Disposable
对象,如果我愿意,我可以使用该对象取消订阅。我知道这个 subscribe
方法中的代码在同一个线程中运行,如果我在 delayElements
调用之前替换 subscribe
方法,那么下面的代码将起作用,因为它在单独的守护线程中运行,所以可以解释为什么吗?它是否停止在 subscribe
方法并且不返回 Disposable
以及是否有任何方法可以通过调用 subscribe
方法来管理订阅?是否可以类比delayElements
方法,在单独的线程中执行,并且调用subscribe
方法的结果立即返回Disposable
?
我找不到这个问题的具体答案。在我看到的所有示例中,要么是有限数据流,要么使用了delayElements方法。
是的,
subscribe()
在这个特定场景中会阻塞。
Reactor 是并发无关的,这意味着默认情况下它不会对您强制执行任何线程/异步性,并在调用线程(在本例中为主线程)上执行管道。
您可以使用
subscribeOn
或 publishOn
显式更改此设置,也可以使用 delayElements
等某些运算符隐式更改。
Flux<Object> flux = Flux.generate(sink -> sink.next("Hello")).publishOn(Schedulers.parallel());
Disposable disposable = flux.subscribe(System.out::println);
disposable.dispose();
System.out.println("This doesn't print");