考虑到Observable<Input>
和昂贵的映射函数Function<Input, Output>
,但需要花费可变的时间,有没有办法在多个输入上并行调用映射函数,并按产生的顺序接收输出?] >
我尝试将observeOn()
与多线程Scheduler
结合使用:
PublishSubject<Input> inputs = PublishSubject.create(); Function<Input, Output> mf = ... Observer<Output> myObserver = ... // Note: same results with newFixedThreadPool(2) Executor exec = Executors.newWorkStealingThreadPool(); // Use ConnectableObservable to make sure mf is called only once // no matter how many downstream observers ConnectableObservable<Output> outputs = inputs .observeOn(SchedulersFrom(exec)) .map(mf) .publish(); outputs.subscribe(myObserver1); outputs.subscribe(myObserver2); outputs.connect(); inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input
但是在测试中,直到
mf.apply(fastInput)
完成后才调用mf.apply(slowInput)
。>[如果我在测试中使用
CountDownLatch
进行一些技巧,以确保mf.apply(slowInput)
直到mf.apply(fastInput)
之后才能完成,否则程序会死锁。
我应该在这里使用一些简单的运算符,还是仅仅因为RxJava的原因而使Observables
混乱,我应该使用其他技术?
ETA:
我看过使用ParallelFlowable
(在订购Flowable
或更确切地说.sequential()
之前将其与myObserver1/2
转换回普通的mySubscriber1/2
),但是后来我得到了额外的mf.apply()
呼叫,每个Subscriber
每输入一个。有ConnectableFlowable
,但我不太想弄清楚如何将其与.parallel()
混合。给定一个Observable 和一个映射函数Function 昂贵,但是需要可变的时间,有没有办法在多个...上并行调用映射函数...
我想observeOn
运算符不支持单独执行并发执行。那么,如何使用flatMap?假设mf
功能需要很多时间。
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
对我来说听起来不可能,除非Rx拥有一些非常专业的操作员。如果使用flatMap
进行映射,则元素将无序到达。或者,您可以使用concatMap
,但随后您将丢失想要的并行映射。