RxJava - Observable的zip列表

问题描述 投票:10回答:3

我有一个Observables列表(RxJava 1)。

List<Observable> observableList = new ArrayList<>();

它可以包含至少1个Observable。每个都有相同类型的结果。

如何压缩所有Observable的结果?

我想到了zip-operator但它不支持List,我不知道可观察量的数量(可以是1,2,3,4 ....)

java android rx-java
3个回答
15
投票

你可以使用静态zip(java.lang.Iterable<? extends Observable<?>> ws,FuncN<? extends R> zipFunction) method

它是一个zip方法,它采用IterableObservableFuncN(它的varargs方法采用call参数)并使用它将相应的发射的Objects组合到结果中,由新返回的Observable省略。

例如,您可以这样做:

Observable.zip(observableList, new FuncN(){
    public ReturnType call(java.lang.Object... args){
        ReturnType result; //to be made
        //preparatory code for using the args
        for (Object obj : args){
            ReturnType retObj = (ReturnType)obj;
            //code to use the arg once at a time to combine N of them into one.
        }
        return result;
    }
});

6
投票

ReactiveX - Zip运算符

拉伸超过BiFunction

Zip通过指定的功能将多个Observable的发射组合在一起,并根据此功能的结果为每个组合发出单个项目

这里,list是您想要传递的任何类型的Observable的数组列表。

val list = arrayListOf<Observable<ImageUrlResponse>>()


Observable.zip(list) { args -> Arrays.asList(args) }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        val response = it[0]
        val imageUrlResponse = imageUrlObject as ImageUrlResponse
        urls.add(imageUrlResponse.imageUrl)}
               }, {
        val c = it
 })

以下订阅的结果如下图所示。就像我们期望它被压缩在一起一样。你也可以注意到它返回所有要在单个java.lang.Object []中压缩的响应。

注意我必须键入cast我的数组列表来访问我的单个对象,因为它是Any类型!

enter image description here


0
投票

.stream-ping之前你可以.reduceList你的.zip

final Observable<YourType> result = observableList.stream()
        .reduce((observable1, observable2) ->
                Observable.zip(observable1, observable2, YourTypeCombiner::methodToCombineTwoObjectsOfYourType))
        .orElse(Observable.just(defaultValueForYourType));

例如结果将发出new Integer(10)

final List<Observable<Integer>> observableList = ImmutableList.of(
        Observable.just(1), Observable.just(2), Observable.just(3), Observable.just(4));

final Observable<Integer> result = observableList.stream()
        .reduce((num1, observable2) ->
                Observable.zip(observable1, observable2, Integer::sum))
        .orElse(Observable.just(0));

错误处理不包括在内。

© www.soinside.com 2019 - 2024. All rights reserved.