RxJava 合并订阅者仅从第一个可观察到的结果

问题描述 投票:0回答:3

我想了解 rxjava 合并是如何工作的。所以这里是简单的代码,应该合并 2 个可观察量的结果并发送给订阅者

    Observable.merge(getObservable(), getTimedObservable())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override public void call(final String s) {
                        Log.i("test", s);
                    }
                });

    private Observable<String> getTimedObservable() {
        return Observable.interval(150, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, String>() {
                    @Override public String call(final Long aLong) {
                        Log.i("test", "tick thread: " + Thread.currentThread().getId());
                        return String.valueOf(aLong);
                    }
                });
    }

    public Observable<String> getObservable() {
        return  Observable.create(new Observable.OnSubscribe<String>() {
            @Override public void call(final Subscriber<? super String> subscriber) {
                try {
                    Log.i("test", "simple observable thread: " + Thread.currentThread().getId());
                    for (int i = 1; i <= 10; i++) {
                        subscriber.onNext(String.valueOf(i * 100));
                        Thread.sleep(300);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });
    }

我预计订阅者的合并结果会像

100 0 1 200 2 300 4 5 400

或者类似的东西,然而,实际结果是:

 test: simple observable thread: 257 
test: 100
test: 200
test: 300
test: 400
test: 500
test: 600
test: 700
test: 800
test: 900
test: 1000
test: tick thread: 254
test: 0
test: tick thread: 254
test: 1
test: tick thread: 254
test: 2
test: tick thread: 254
test: 3
test: tick thread: 254
test: 4
test: tick thread: 254
test: 5
test: tick thread: 254
test: 6
test: tick thread: 254
test: 7
test: tick thread: 254
test: 8
test: tick thread: 254
test: 9
test: tick thread: 254
test: 10
test: tick thread: 254
test: 11
test: tick thread: 254
test: 12
test: tick thread: 254
test: 13

看起来第一个 Observable 中的 Thread.sleep 块在第二个 Observable 中发出,但我不明白如何实现。有人可以解释一下吗?

rx-java
3个回答
6
投票

merge 将同时订阅两个可观察量。首先被订阅的可观察对象将在调用线程上产生值。因为调用线程被 observable1 阻塞,所以 observable2 无法产生值。 SubscribeOn 只会说明订阅将在哪里发生。假设 observable 开始在 main-1 上生成值。下游的每个值都将位于同一线程上。没有并发发生。

如果你想实现并发,你必须为每个可观察者说明订阅必须在哪里发生。假设我们有 Observables.merge 和两个 observables。 Observable1 和 Observable2 有 subscribeOn 和一些线程池。每个 observable 都会在给定的 subscribeOn 线程上生成值。您实现了并发。

请查看编辑后的输出:

@Test
public void name() throws Exception {
    Subscription subscribe = Observable.merge(getObservable(), getTimedObservable())
            //.observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> {

                System.out.println("subscription " + s);
                //Log.i("test", s);
            });


    Thread.sleep(5_000);
}

private Observable<String> getTimedObservable() {
    return Observable.interval(150, TimeUnit.MILLISECONDS)
            .map(aLong -> {
                System.out.println("getTimedObservable: " + Thread.currentThread().getId());

                //Log.i("test", "tick thread: " + Thread.currentThread().getId());
                return String.valueOf(aLong);
            }).subscribeOn(Schedulers.io());
}

private Observable<String> getObservable() {
    return Observable.<String>create(subscriber -> {
        try {
            for (int i = 1; i <= 10; i++) {
                System.out.println("getObservable: " + Thread.currentThread().getId());
                subscriber.onNext(String.valueOf(i * 100));
                Thread.sleep(300);
            }
            subscriber.onCompleted();
        } catch (Exception e) {
            subscriber.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

1
投票

RxJava 默认情况下不是多线程的,一切都在同一个线程上运行。如果你想要多线程,你需要使用调度程序。

.subscribe(Subscribers.io())
末尾添加
getObservable()


0
投票

merge同时订阅两个observables,第一个observable1第二个observable2,每个observables应该在自己的线程上工作

subscribeOn
,因此,我们从两个observables接收数据

val observable1 = Observable.create<Int> {
    it.onNext(1)
    Thread.sleep(1000)
    it.onNext(2)
}.subscribeOn(Schedulers.io())

val observable2 = Observable.create<Int> {
    it.onNext(3)
    it.onNext(4)
    Thread.sleep(1000)
    it.onNext(5)
}.subscribeOn(Schedulers.io())

observable1.mergeWith(observable2).subscribe {}

结果 1, 3, 4, 2, 5

© www.soinside.com 2019 - 2024. All rights reserved.