我正在使用来自不同外部源/订阅包装到不同Flowable的事件。源无关紧要,因为我可以用简单的循环重现问题。我有 :
这个简单的代码重现了
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);
List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
final int finalI = i;
Flowable.create( new FlowableOnSubscribe<Long>(){
@Override
public void subscribe(FlowableEmitter<Long> emitter) {
emitterList.add(emitter);
}
}, BackpressureStrategy.MISSING)
.observeOn(scheduler)
.subscribe(
val -> System.out.println(
"[" +Thread.currentThread().getName()
+ "] Flow:" + finalI
+ " > " + Long.toString(val)));
}
long state = 0;
for (int i=0; i<5; ++i) {
for (FlowableEmitter<Long> emitter: emitterList){
emitter.onNext(++state);
}
}
executor.shutdown();
我的问题是事件的消耗顺序与它们发出的顺序不同。如果我删除observeOn(调度程序)它工作正常,但我需要在不同的线程上发射器和订阅者。我也测试了不同的BackpressureStrategy,它没有帮助。 任何线索都按顺序订阅/打印所有号码(1,2,3,4,5 ...... 14,15),而不是我下面的内容
[pool-1-thread-1] Flow:0 > 1
[pool-1-thread-1] Flow:0 > 4
[pool-1-thread-1] Flow:0 > 7
[pool-1-thread-1] Flow:0 > 10
[pool-1-thread-1] Flow:0 > 13
[pool-1-thread-1] Flow:1 > 2
[pool-1-thread-1] Flow:1 > 5
[pool-1-thread-1] Flow:1 > 8
[pool-1-thread-1] Flow:1 > 11
[pool-1-thread-1] Flow:1 > 14
[pool-1-thread-1] Flow:2 > 3
[pool-1-thread-1] Flow:2 > 6
[pool-1-thread-1] Flow:2 > 9
[pool-1-thread-1] Flow:2 > 12
[pool-1-thread-1] Flow:2 > 15
如果重要的话,我正在使用rx-java 2.2.5和Java 8。
observeOn
不公平,如果还有更多的工作要做,可以拥抱排放线。通过循环发射,一个源可能准备发射更多,因此您无法获得完美的交错。另请注意,通过使用MISSING
策略,您很容易出现背压异常。尝试扩展项目中的requestObserveOn:
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Scheduler scheduler = Schedulers.from(executor);
List<FlowableEmitter<Long>> emitterList = new ArrayList<>();
for (int i=0; i<3; ++i) {
final int finalI = i;
Flowable.create( new FlowableOnSubscribe<Long>(){
@Override
public void subscribe(FlowableEmitter<Long> emitter) {
emitterList.add(emitter);
}
}, BackpressureStrategy.BUFFER)
// ---------------------------------------------------------
.compose(FlowableTransformers.requestObserveOn(scheduler))
// ---------------------------------------------------------
.subscribe(
val -> System.out.println(
"[" +Thread.currentThread().getName()
+ "] Flow:" + finalI
+ " > " + Long.toString(val)));
}
long state = 0;
for (int i=0; i<5; ++i) {
for (FlowableEmitter<Long> emitter: emitterList){
emitter.onNext(++state);
}
}
executor.shutdown();