如何在lambda和ObservableSource中使用concat

问题描述 投票:0回答:1
RxJava2
kotlin 

这很好,我可以合并两个可观察到的值

   Observable.concat(countries(), animals())
                    .subscribeBy {
                        println(it)
                    }

此示例我无法理解,因为它使用了一个似乎带有ObservableSource的lambda,并且我想合并这两个ObservableSource,但它会导致null异常。只是想知道我在做什么错。在concat中使用lambda的目的是什么?

        Observable.concat<String> {
                    it.onNext(countries())
                    it.onNext(animals())
                }.subscribeBy {
                    println(it)
                }


    private fun animals(): Observable<String> =
            Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")

    private fun countries(): Observable<String> =
            Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")

这是我遇到的崩溃:

Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)

这是我想指的ObservableSource的接口。

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

非常感谢您的任何建议

kotlin rx-java2
1个回答
0
投票

将lambda传递给concat解析为concat(ObservableSource<? extends ObservableSource<? extends T>> sources)。由于concat(ObservableSource<? extends ObservableSource<? extends T>> sources)是具有单个非默认方法的接口,因此会触发Kotlin的ObservableSource。这就是为什么它会选择这种重载的原因-它是唯一具有可通过SAM转换实现的接口的重载。

因此,lambda成为SAM conversion方法的实现。该方法记录为:

将给定的Observer订阅此ObservableSource实例。

因此,lambda需要将参数(ObservableSource.subscribe(Observer<? super T> observer))订阅到ObservableSource.subscribe(Observer<? super T> observer)的源。之所以得到it,是因为您没有订阅它,而是开始在尚未订阅的Observables上调用NullPointerException,因此内部状态不正确(在这种情况下,尚未设置队列,但这并不是特别重要)。

要履行该方法的约定,您只需创建一个发出onNextObserver并将lambda中的Observable订阅给Observableit),就像这样:

Observer

我已经在本地测试过,它可以产生预期的结果。

© www.soinside.com 2019 - 2024. All rights reserved.