从订阅 RxJava 中调用取消订阅的回调

问题描述 投票:0回答:1

我正在尝试学习 RxJava 并浏览一些内容。我经历过这个例子,订阅者在订阅 observable 后很快就取消订阅。

    public static void main(String[] args) {
        Observable<Integer> ints = Observable.create(subscriber -> {
            Runnable r = () -> {
                sleep(10, SECONDS);
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(5);
                    subscriber.onCompleted();
                }
            };
            Thread thread = new Thread(r);
            thread.start();
            Subscription subscription = Subscriptions.create(thread::interrupt);
            System.out.println("inner subscription : "+subscription);
            subscriber.add(subscription);
        });

        Subscription subscription = ints.subscribe(x->System.out.println(x));
        System.out.println("outer subscription : "+subscription);
        subscription.unsubscribe();
    }

    static void sleep(int timeout, TimeUnit unit) {
        try {
            unit.sleep(timeout);
        } catch (InterruptedException ignored) {
            //intentionally ignored
        }
    }

当我们在 main 方法中创建订阅并将其添加到订阅者订阅回调中时,将立即中断线程,而无需等待 10 秒。

我的理解是,我们添加的订阅应该与调用取消订阅的订阅相同,但是当我在控制台上打印两者时,它们是不同的。

  1. 如果订阅不同,调用取消订阅如何触发线程中断。
inner subscription : rx.subscriptions.BooleanSubscription@215be6bb

outer subscription : rx.observers.SafeSubscriber@5d5eef3d
  1. 我们这里有 1 个订阅者有两个不同的订阅吗?

谢谢

java multithreading observable rx-java subscriber
1个回答
0
投票
  1. 如果订阅不同,调用取消订阅如何触发线程中断。

SafeSubscriber
包装了原始的
Subscriber
,在文档中你可以阅读:

SafeSubscriber 是 Subscriber 的包装器,确保 订阅者遵守 Observable 合约。

所以你的

unsubscribe()
调用最终会调用你的
subscription
,这会中断你的线程。您可以在调试器中自行调查它,查看实际 -> 订阅列表中的
SafeSubscriber
实例。

  1. 我们这里有 1 个订阅者有两个不同的订阅吗?

不,这里只有一个

Subscriber
SafeSubscriber
只是原始
Subscriber
的包装,以确保正确遵循 RxJava 合约。

您还应该意识到,您的代码表明您正在遵循一些 rxjava1 指南/书籍,

Subscription
类在 4(?)年前在 rxjava2 中被
Disposable
类替换。目前 rxjava 版本为 3。

© www.soinside.com 2019 - 2024. All rights reserved.