如果我创建一个
Sinks.Many
实例:
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
然后在另一个线程(T1)中,我订阅了它的
Flux
:
new Thread(() -> {
sink.asFlux().take(5).subscribe(
m -> System.out.println("T1: " + m), //onNext
System.out::println, //onError
() -> System.out.println("T1 DONE")); //onComplete
})
.start();
并在“主”线程中发出一些元素:
for (int i = 0; i < 10; i++) {
sink.tryEmitNext(i);
Thread.sleep(500);
}
//Verify that 0 subscribers are left at this point
System.out.println("Subscribers: " + sink.currentSubscriberCount());
我看到 T1 按预期打印 0-4 和“完成”,但此时接收器变得毫无用处——所有进一步的订阅由于某种原因立即完成,并且排放不再做任何可观察到的事情。
例如如果我尝试再次订阅并发出:
new Thread(() -> {
sink.asFlux().take(5).subscribe(
m -> System.out.println("T2: " + m), //onNext
System.out::println, //onError
() -> System.out.println("T2 DONE")); //onComplete
})
.start();
for (int i = 0; i < 10; i++) {
Sinks.EmitResult emitResult = sink.tryEmitNext(i);
if (emitResult.isFailure()) {
throw new RuntimeException(); //Never happens
}
Thread.sleep(500);
}
T2 立即打印“T2 DONE”,因为
Flux
看似已完成,但后续的每个 sink.tryEmitNext
仍然成功 (isSuccessful() == true
)。
我显然无法理解
Sinks.many().multicast()
应该如何工作,因为这种行为对我来说毫无意义。当 Flux
从未被调用时,是什么完成了 (try)EmitComplete
?如果没有任何东西可以订阅它,为什么接收器仍然成功发射?它陷入了一个既完整又不完整的困境。
我错过了什么?我是否应该使用不同的结构(见下文)来拥有一个无限的接收器,允许订阅者随意来去?
如果我将我的水槽定义为:
//0 because I don't actually want any replay
Sinks.Many<Integer> sink = Sinks.many().replay().limit(Duration.ZERO);
我得到了我期望的结果 - T2 也得到了它的 5 个元素。
此行为的原因是
onBackpressureBuffer()
的预期行为,默认情况下启用 autoCancel
(尽管方法文档本身中未记录),一旦所有订阅者取消订阅即可完成接收器。
要禁用它,请使用重载:
onBackpressureBuffer(int bufferSize, boolean autoCancel)
。
示例:
Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);