我有一个Observables列表(RxJava 1)。
List<Observable> observableList = new ArrayList<>();
它可以包含至少1个Observable。每个都有相同类型的结果。
如何压缩所有Observable的结果?
我想到了zip-operator但它不支持List,我不知道可观察量的数量(可以是1,2,3,4 ....)
你可以使用静态zip(java.lang.Iterable<? extends Observable<?>> ws,FuncN<? extends R> zipFunction)
method。
它是一个zip
方法,它采用Iterable
的Observable
和FuncN
(它的varargs方法采用call
参数)并使用它将相应的发射的Object
s组合到结果中,由新返回的Observable
省略。
例如,您可以这样做:
Observable.zip(observableList, new FuncN(){
public ReturnType call(java.lang.Object... args){
ReturnType result; //to be made
//preparatory code for using the args
for (Object obj : args){
ReturnType retObj = (ReturnType)obj;
//code to use the arg once at a time to combine N of them into one.
}
return result;
}
});
ReactiveX - Zip运算符
拉伸超过BiFunction
Zip通过指定的功能将多个Observable的发射组合在一起,并根据此功能的结果为每个组合发出单个项目
这里,list是您想要传递的任何类型的Observable的数组列表。
val list = arrayListOf<Observable<ImageUrlResponse>>()
Observable.zip(list) { args -> Arrays.asList(args) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
val response = it[0]
val imageUrlResponse = imageUrlObject as ImageUrlResponse
urls.add(imageUrlResponse.imageUrl)}
}, {
val c = it
})
以下订阅的结果如下图所示。就像我们期望它被压缩在一起一样。你也可以注意到它返回所有要在单个java.lang.Object []中压缩的响应。
注意我必须键入cast我的数组列表来访问我的单个对象,因为它是Any类型!
在.stream
-ping之前你可以.reduce
和List
你的.zip
:
final Observable<YourType> result = observableList.stream()
.reduce((observable1, observable2) ->
Observable.zip(observable1, observable2, YourTypeCombiner::methodToCombineTwoObjectsOfYourType))
.orElse(Observable.just(defaultValueForYourType));
例如结果将发出new Integer(10)
final List<Observable<Integer>> observableList = ImmutableList.of(
Observable.just(1), Observable.just(2), Observable.just(3), Observable.just(4));
final Observable<Integer> result = observableList.stream()
.reduce((num1, observable2) ->
Observable.zip(observable1, observable2, Integer::sum))
.orElse(Observable.just(0));
错误处理不包括在内。