何时在RxJava中使用blockingSubscribe

问题描述 投票:0回答:1

在什么情况下需要在RxJava中使用blockingSubscribe?

[考虑以下内容,它可以打印结果。

Observable.just(1, 2, 3, 4, 5)
            .subscribe(line -> System.out.println(line));

虽然更改为以下内容,但不打印结果,但为什么打印?

Observable.interval(500, TimeUnit.MILLISECONDS)
            .subscribe(line -> System.out.println(line));

并且如果更改为使用blockingSubscribe,它可以再次显示结果。

Observable.interval(500, TimeUnit.MILLISECONDS)
            .blockingSubscribe(line -> System.out.println(line));

我读到“ blockingSubscribe”是阻塞主线程,但是为什么要阻塞主线程(就像在现实世界中的情况一样),如果不想阻塞主线程执行但仍想使用Observable.interval?

rx-java rx-java2
1个回答
0
投票

第一个示例在订阅线程的堆栈上运行。当您订阅subscribeAcutal时,将发生在调用线程中。然后,正义操作员将与订阅者同步地将所有项(1,2,3,4,5)依次打开。在每个onNext上,将呼叫订阅者并打印号码。

Observable.just(1, 2, 3, 4, 5)
            .subscribe(line -> System.out.println(line));

第二个示例将不打印任何内容,因为涉及并发并且您的主要方法“失败”。在内部,Interval-Operator使用调度程序。订阅后,间隔操作员将作业添加到调度程序中,该作业将在500毫秒内运行。调用线程调用实际订阅,操作员将作业添加到调度程序中并完成。堆栈上没有其他要弹出的堆栈框架。因此,subscribe-Method调用已完成,并且调用(主)线程能够继续进行。在您的情况下,没有更多方法可以调用,因此程序退出。程序将比发射快退出,发射将在500ms内完成。这就是为什么在System.out上看不到任何输出的原因。

Observable.interval(500, TimeUnit.MILLISECONDS)
            .subscribe(line -> System.out.println(line));

如果要在打印单据之前停止退出主线程,可以只添加一个

Thread.sleep(1_000)

主线程将在线程#sleep处停止1000ms。当主线程被阻塞时,调度程序将在另一个线程上发出一个值,然后该线程将调用订阅者的onNext。将调用println调用并打印值。

您想要在此处实现的是同步。主线程与另一个线程。这就是阻塞*-订阅开始起作用。如果要将一个线程与另一个线程连接,则必须阻塞该线程,直到到达特定信号为止。

如果不想阻止主线程执行,但仍想使用Observable.interval

您的应用程序中的执行主线程不允许完成。

如何实现?

仅创建/启动非守护线程(例如Android,SWT,JavaFX)^ 1

^ 1 Main method won't return

© www.soinside.com 2019 - 2024. All rights reserved.