发布主题的订阅者收到的事件顺序

问题描述 投票:0回答:1

我有一个具有多个订阅者的发布主题:

这是班级:

class Real {

    private val publisher: PublishSubject<String> = PublishSubject.create()

    fun doPublish() {
        for (i in 1 until 20) {
            publisher.onNext("$i Hello")
        }
        publisher.onComplete()
    }

    fun doSubscribe() {
        publisher.subscribe {
            println("Subscriber1 $it")
        }

        publisher.subscribe {
            println("Subscriber2 $it")
        }

        publisher.subscribe {
            println("Subscriber3 $it")
        }

    }
}

[我打电话给doSubscribe()之前先打电话给doPublish()输出如下:

 Task :Main.main()
Subscriber1 1 Hello
Subscriber2 1 Hello
Subscriber3 1 Hello
Subscriber1 2 Hello
Subscriber2 2 Hello
Subscriber3 2 Hello
Subscriber1 3 Hello
Subscriber2 3 Hello
Subscriber3 3 Hello
Subscriber1 4 Hello
Subscriber2 4 Hello
Subscriber3 4 Hello
Subscriber1 5 Hello
Subscriber2 5 Hello
Subscriber3 5 Hello
Subscriber1 6 Hello
Subscriber2 6 Hello
Subscriber3 6 Hello
Subscriber1 7 Hello
Subscriber2 7 Hello
Subscriber3 7 Hello
Subscriber1 8 Hello
Subscriber2 8 Hello
Subscriber3 8 Hello
Subscriber1 9 Hello
Subscriber2 9 Hello
Subscriber3 9 Hello
Subscriber1 10 Hello
Subscriber2 10 Hello
Subscriber3 10 Hello
Subscriber1 11 Hello
Subscriber2 11 Hello
Subscriber3 11 Hello
Subscriber1 12 Hello
Subscriber2 12 Hello
Subscriber3 12 Hello
Subscriber1 13 Hello
Subscriber2 13 Hello
Subscriber3 13 Hello
Subscriber1 14 Hello
Subscriber2 14 Hello
Subscriber3 14 Hello
Subscriber1 15 Hello
Subscriber2 15 Hello
Subscriber3 15 Hello
Subscriber1 16 Hello
Subscriber2 16 Hello
Subscriber3 16 Hello
Subscriber1 17 Hello
Subscriber2 17 Hello
Subscriber3 17 Hello
Subscriber1 18 Hello
Subscriber2 18 Hello
Subscriber3 18 Hello
Subscriber1 19 Hello
Subscriber2 19 Hello
Subscriber3 19 Hello

根据上述程序,第一个订户首先接收事件,然后是第二个和第三个事件,这完全按照订阅的顺序进行。

此执行顺序是否得到保证?由于我找不到与此相关的文档。

kotlin rx-java rx-java2 rx-kotlin2
1个回答
0
投票

请查看PublishSubject实现:

订阅发生了什么?

创建一个PublishDisposable并通过添加方法将其添加到“订户”的数组中(b [n] = ps;)

现在PublishSubject具有一个订阅者数组,该数组遵循插入顺序

@Override
protected void subscribeActual(Observer<? super T> t) {
    PublishDisposable<T> ps = new PublishDisposable<T>(t, this);
    t.onSubscribe(ps);
    if (add(ps)) {
        // if cancellation happened while a successful add, the remove() didn't work
        // so we need to do it again
        if (ps.isDisposed()) {
            remove(ps);
        }
    } else {
        ...
    }
}

boolean add(PublishDisposable<T> ps) {
    for (;;) {
        PublishDisposable<T>[] a = subscribers.get();
        if (a == TERMINATED) {
            return false;
        }

        int n = a.length;
        @SuppressWarnings("unchecked")
        PublishDisposable<T>[] b = new PublishDisposable[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = ps;

        if (subscribers.compareAndSet(a, b)) {
            return true;
        }
    }
}

源现在通过onNext发出新值。 onNext方法显示订户的onNext-invocation的调用。用户数组从0 ... n开始迭代。 因此,将按插入顺序调用订户,因为必须按合同依次调用onNext。

可观察对象必须以串行方式(而不是并行方式)向观察者发出通知。他们可以从不同的线程发出这些通知,但是在通知之间必须存在正式的事前关系。 (http://reactivex.io/documentation/contract.html

@Override
public void onNext(T t) {
    ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources.");
    for (PublishDisposable<T> pd : subscribers.get()) {
        pd.onNext(t);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.