如何相应地链接Dependent Observable

问题描述 投票:2回答:2

RxJava相对较新。有以下要求。任何指针都会有所帮助。例如: - 有3个可观察的A,B,C这样的

  1. A从数据源(如DB)发出数据
  2. B&C都依赖于来自A的数据。
  3. B&C应该并行执行
  4. 可观察的D要求B&C拉链并产生综合结果。

尝试过类似下面的内容,但它没有解决上面#4中需要Observables B&C以达到D到zip(B,C)的问题。

Observable<A>.just("AdataSource")
  .flatMap(Asdata->{ callB(Asdata) },(Asdata,BsResult)->{callC(AsData)};
rx-java rx-java2
2个回答
3
投票

您可以使用publish()运算符的内部形式。

Observable<A> dataSource;
Observable<Pair<B, C>> dObservable = dataSource
  .publish( ds ->
    Observable.zip( callB( ds ), callC( ds ), (b, c) -> new Pair<>(b, c)));

callB()callC()方法有这些签名:

Observable<B> callB( Observable<A> dataSource );
Observable<C> callC( Observable<A> dataSource );

publish()算子所做的是将观察者与ds绑定,然后由BC观察者链共享。


0
投票

您可以使用ObservableB和ObservableC的zip来平面地映射ObservableA。

假设这是你的ObservableA,ObservableB和ObservableC。

(Kotlin中的示例代码)

fun getObservableA(): Observable<Int> {
    return Observable.just(1, 2, 3)
}

fun getObservableB(num: Int): Observable<String> {
    return Observable.just("${num}A", "${num}B")
}

fun getObservableC(num: Int): Observable<String> {
    return Observable.just("${num}a", "${num}b", "${num}c")
}

你可以从调用getObservableA()和flatMap开始,将其结果调整为getObservableB(num)getObservableC(num)的拉链。

getObservableA()
        .flatMap { num ->
            Observable.zip(
                    getObservableB(num),
                    getObservableC(num),
                    BiFunction { t1: String, t2: String -> t1 to t2 }
            )
        }
        .subscribe(
                { (t1, t2) ->
                    // This will emit 6 times as follows:-
                    //   t1 = 1A, t2 = 1a
                    //   t1 = 1B, t2 = tb
                    //   t1 = 2A, t2 = 2a
                    //   t1 = 2B, t2 = 2b
                    //   t1 = 3A, t2 = 3a
                    //   t1 = 3B, t2 = 3b
                }
        )
© www.soinside.com 2019 - 2024. All rights reserved.