组合两个Observable流时如何发出最终值?

问题描述 投票:2回答:2

使用杰克沃顿的Managing State with RxJava模式。

我将两个api调用组合在一起并行执行。

两者完成后如何发出“成功”项目?

请参阅下面的代码中的注释。

谢谢!

主要呼叫功能:

    Observable
            .just(UpdatePicEvent(userId, file))
            .compose(updatePic()) <-- Handles updating pic, emits models consumed by UI
            .mergeWith(
                    Observable
                        .just(UpdateProfileEvent(..params...))
                        .compose(updateProfile()) <-- Handles updating other settings, emits models consumed by UI
            )

            // TODO Need to add something to emit a Success() model item when both actions above have completed  

            .subscribe(...pass models to UI...)

updatePic()

fun updatePic(): ObservableTransformer<UpdatePicEvent, ProfileSettingsModel> {
        return ObservableTransformer {
            it.flatMap {
                api.uploadProfilePic(it.userId, it.pic)
                    .map { UpdatePicSuccessful(it) as ProfileSettingsModel }
                    .onErrorReturn { UpdatePicError(it) as ProfileSettingsModel }
                    .startWith(UpdatePicInProgress() as ProfileSettingsModel)
            }
        }
    }

更新配置文件()

fun updateProfile(): ObservableTransformer<UpdateProfileEvent, ProfileSettingsModel> {
        return ObservableTransformer {
            it.flatMap {
                api
                    .updateUser(...params...)
                    .subscribeOn(Schedulers.io())
                    .map { UpdateProfileSuccessful(it) as ProfileSettingsModel }
                    .onErrorReturn { UpdateProfileError(it) as ProfileSettingsModel }
                    .observeOn(AndroidSchedulers.mainThread())
                    .startWith(UpdateProfileInProgress() as ProfileSettingsModel)
            }
        }
    }
android kotlin rx-java
2个回答
0
投票

你收到两个不同的ProfileSettingsModel你需要合并。如果您不想在订阅者中执行此操作,则可以使用zip operator来实现您的链。

通过指定的功能将多个Observable的排放组合在一起,并根据此功能的结果为每个组合发出单个项目

Observable.zip(
    Observable.just(UpdatePicEvent(userId, file)).compose(updatePic())
    Observable.just(UpdateProfileEvent(..params...)).compose(updateProfile())
    mergePicAndProfile)

mergePicAndProfileBiFunction,接收两个结果并发出单个实体。


0
投票

鉴于您对原始问题的评论,concat可以帮助您。文档的相关部分说:

Concat运算符连接多个Observable的输出,使它们像单个Observable一样,第一个Observable发出的所有项在第二个Observable发出的任何项之前发出(如果超过二)。

Concat等待订阅您传递给它的每个其他Observable,直到上一个Observable完成。

执行:

Observable.mergeDelayError(
    Observable.just(UpdatePicEvent(userId, file)).compose(updatePic()),
    Observable.just(UpdateProfileEvent(userId, params)).compose(updateProfile())    
)
    .concatWith(Observable.just(AllSuccessful()))  // Or whatever
    .onErrorReturn { when (it) {
        is UpdatePicException -> UpdatePicError(it)
        is UpdateProfileException -> UpdateProfileError(it)
    }}

这有两个关键因素:

  1. 你需要在onErrorReturnupdatePic变换器中替换updateProfile,这样当它们失败时它们实际上会发出错误(可能有类似onErrorResumeNext { Observable.error(MyTypeOfException()) }的东西。如果你不这样做,流就无法知道是否有错误或不(因为在项目中包装错误使其与成功无法区分)
  2. 使用mergeDelayError让每个流独立于另一个流继续。如果你只是使用merge,第一个失败将阻止另一个继续。

(请注意,我稍微修改了原始代码,恕我直言,更具可读性)

© www.soinside.com 2019 - 2024. All rights reserved.