对单个Observable的并行map()操作,并接收乱序的结果

问题描述 投票:1回答:2

考虑到Observable<Input>和昂贵的映射函数Function<Input, Output>,但需要花费可变的时间,有没有办法在多个输入上并行调用映射函数,并按产生的顺序接收输出?] >

我尝试将observeOn()与多线程Scheduler结合使用:

PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...

// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();

// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
    .observeOn(SchedulersFrom(exec))
    .map(mf)
    .publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();

inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input

但是在测试中,直到mf.apply(fastInput)完成后才调用mf.apply(slowInput)。>

[如果我在测试中使用CountDownLatch进行一些技巧,以确保mf.apply(slowInput)直到mf.apply(fastInput)之后才能完成,否则程序会死锁。

我应该在这里使用一些简单的运算符,还是仅仅因为RxJava的原因而使Observables混乱,我应该使用其他技术?


ETA:

我看过使用ParallelFlowable(在订购Flowable或更确切地说.sequential()之前将其与myObserver1/2转换回普通的mySubscriber1/2),但是后来我得到了额外的mf.apply()呼叫,每个Subscriber每输入一个。有ConnectableFlowable,但我不太想弄清楚如何将其与.parallel()混合。

给定一个Observable 和一个映射函数Function 昂贵,但是需要可变的时间,有没有办法在多个...上并行调用映射函数...

concurrency parallel-processing reactive-programming rx-java2
2个回答
1
投票

我想observeOn运算符不支持单独执行并发执行。那么,如何使用flatMap?假设mf功能需要很多时间。

    ConnectableObservable<Output> outputs = inputs
        .flatMap(it -> Observable.just(it)
            .observeOn(SchedulersFrom(exec))
            .map(mf))
        .publish();

0
投票

对我来说听起来不可能,除非Rx拥有一些非常专业的操作员。如果使用flatMap进行映射,则元素将无序到达。或者,您可以使用concatMap,但随后您将丢失想要的并行映射。

© www.soinside.com 2019 - 2024. All rights reserved.