我从 Java 反应堆开始,试图了解背压是如何工作的。 我写了下面的示例代码:
Flux<Long> publisher=Flux.interval(Duration.ofMillis(1));
publisher
.subscribeOn(Schedulers.newSingle("publisher-x"))
.map(x->{
System.out.println("mapper on Thread "+Thread.currentThread().getName()+" for item "+x);
return x;
})
.publishOn(Schedulers.newSingle("subscriber-x"))
.subscribe(new Subscriber<Long>() {
private Subscription subscription;
private int count = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long item) {
try {
TimeUnit.MILLISECONDS.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("subscriber on Thread "+Thread.currentThread().getName()+" for item "+item);
count++;
/*if (count >= 5) {
subscription.cancel();
} else {s
subscription.request(1);
}*/
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done in thread "+Thread.currentThread().getName());
}
});
}
我原以为我的发布者会继续生成项目,直到出现反压异常,但我对我得到的结果感到惊讶:
mapper on Thread parallel-1 for item 0
mapper on Thread parallel-1 for item 1
...
mapper on Thread parallel-1 for item 255
subscriber on Thread subscriber-x-1 for item 0
subscriber on Thread subscriber-x-1 for item 1
...
subscriber on Thread subscriber-x-1 for item 255
reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:132)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我不明白为什么我的发布者在第 255 次迭代时停止,为什么我只在我的订阅者完成处理所有项目后才得到异常(当环形缓冲区为空时抛出异常!!) 谁能帮助理解这一点?