我正在尝试学习 RxJava 并浏览一些内容。我经历过这个例子,订阅者在订阅 observable 后很快就取消订阅。
public static void main(String[] args) {
Observable<Integer> ints = Observable.create(subscriber -> {
Runnable r = () -> {
sleep(10, SECONDS);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(5);
subscriber.onCompleted();
}
};
Thread thread = new Thread(r);
thread.start();
Subscription subscription = Subscriptions.create(thread::interrupt);
System.out.println("inner subscription : "+subscription);
subscriber.add(subscription);
});
Subscription subscription = ints.subscribe(x->System.out.println(x));
System.out.println("outer subscription : "+subscription);
subscription.unsubscribe();
}
static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException ignored) {
//intentionally ignored
}
}
当我们在 main 方法中创建订阅并将其添加到订阅者订阅回调中时,将立即中断线程,而无需等待 10 秒。
我的理解是,我们添加的订阅应该与调用取消订阅的订阅相同,但是当我在控制台上打印两者时,它们是不同的。
inner subscription : rx.subscriptions.BooleanSubscription@215be6bb outer subscription : rx.observers.SafeSubscriber@5d5eef3d
谢谢
- 如果订阅不同,调用取消订阅如何触发线程中断。
SafeSubscriber
包装了原始的 Subscriber
,在文档中你可以阅读:
SafeSubscriber 是 Subscriber 的包装器,确保 订阅者遵守 Observable 合约。
所以你的
unsubscribe()
调用最终会调用你的 subscription
,这会中断你的线程。您可以在调试器中自行调查它,查看实际 -> 订阅列表中的 SafeSubscriber
实例。
- 我们这里有 1 个订阅者有两个不同的订阅吗?
不,这里只有一个
Subscriber
,SafeSubscriber
只是原始 Subscriber
的包装,以确保正确遵循 RxJava 合约。
您还应该意识到,您的代码表明您正在遵循一些 rxjava1 指南/书籍,
Subscription
类在 4(?)年前在 rxjava2 中被 Disposable
类替换。目前 rxjava 版本为 3。