RxJava2
kotlin
这很好,我可以合并两个可观察到的值
Observable.concat(countries(), animals())
.subscribeBy {
println(it)
}
此示例我无法理解,因为它使用了一个似乎带有ObservableSource的lambda,并且我想合并这两个ObservableSource,但它会导致null异常。只是想知道我在做什么错。在concat中使用lambda的目的是什么?
Observable.concat<String> {
it.onNext(countries())
it.onNext(animals())
}.subscribeBy {
println(it)
}
private fun animals(): Observable<String> =
Observable.just("fox", "cat", "dog", "bear", "bat", "hare", "lion", "tiger")
private fun countries(): Observable<String> =
Observable.just("England", "France", "Thailand", "America", "Scotland", "Ice Land")
这是我遇到的崩溃:
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.observable.ObservableConcatMap$SourceObserver.onNext(ObservableConcatMap.java:129)
这是我想指的ObservableSource
的接口。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
非常感谢您的任何建议
将lambda传递给concat解析为concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
。由于concat(ObservableSource<? extends ObservableSource<? extends T>> sources)
是具有单个非默认方法的接口,因此会触发Kotlin的ObservableSource
。这就是为什么它会选择这种重载的原因-它是唯一具有可通过SAM转换实现的接口的重载。
因此,lambda成为SAM conversion方法的实现。该方法记录为:
将给定的Observer订阅此ObservableSource实例。
因此,lambda需要将参数(ObservableSource.subscribe(Observer<? super T> observer)
)订阅到ObservableSource.subscribe(Observer<? super T> observer)
的源。之所以得到it
,是因为您没有订阅它,而是开始在尚未订阅的Observables
上调用NullPointerException
,因此内部状态不正确(在这种情况下,尚未设置队列,但这并不是特别重要)。
要履行该方法的约定,您只需创建一个发出onNext
的Observer
并将lambda中的Observable
订阅给Observable
(it
),就像这样:
Observer
我已经在本地测试过,它可以产生预期的结果。