下沉。在订阅者数量第一次达到 0 后,许多默默地变得毫无用处

问题描述 投票:0回答:1

如果我创建一个

Sinks.Many
实例:

Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

然后在另一个线程(T1)中,我订阅了它的

Flux

new Thread(() -> {
    sink.asFlux().take(5).subscribe(
        m -> System.out.println("T1: " + m),  //onNext
        System.out::println,                  //onError
        () -> System.out.println("T1 DONE")); //onComplete
})
.start();

并在“主”线程中发出一些元素:

for (int i = 0; i < 10; i++) {
    sink.tryEmitNext(i);
    Thread.sleep(500);
}

//Verify that 0 subscribers are left at this point
System.out.println("Subscribers: " + sink.currentSubscriberCount());

我看到 T1 按预期打印 0-4 和“完成”,但此时接收器变得毫无用处——所有进一步的订阅由于某种原因立即完成,并且排放不再做任何可观察到的事情。

例如如果我尝试再次订阅并发出:

new Thread(() -> {
    sink.asFlux().take(5).subscribe(
        m -> System.out.println("T2: " + m),  //onNext
        System.out::println,                  //onError
        () -> System.out.println("T2 DONE")); //onComplete
})
.start();
    
for (int i = 0; i < 10; i++) {
    Sinks.EmitResult emitResult = sink.tryEmitNext(i);
    if (emitResult.isFailure()) {
        throw new RuntimeException(); //Never happens
    }
    Thread.sleep(500);
}

T2 立即打印“T2 DONE”,因为

Flux
看似已完成,但后续的每个
sink.tryEmitNext
仍然成功 (
isSuccessful() == true
)。

我显然无法理解

Sinks.many().multicast()
应该如何工作,因为这种行为对我来说毫无意义。当
Flux
从未被调用时,是什么完成了
(try)EmitComplete
?如果没有任何东西可以订阅它,为什么接收器仍然成功发射?它陷入了一个既完整又不完整的困境。

我错过了什么?我是否应该使用不同的结构(见下文)来拥有一个无限的接收器,允许订阅者随意来去?

如果我将我的水槽定义为:

//0 because I don't actually want any replay
Sinks.Many<Integer> sink = Sinks.many().replay().limit(Duration.ZERO);

我得到了我期望的结果 - T2 也得到了它的 5 个元素。

java multithreading concurrency spring-webflux project-reactor
1个回答
0
投票

此行为的原因是

onBackpressureBuffer()
的预期行为,默认情况下启用
autoCancel
(尽管方法文档本身中未记录),一旦所有订阅者取消订阅即可完成接收器。

要禁用它,请使用重载:

onBackpressureBuffer(int bufferSize, boolean autoCancel)

示例:

Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
© www.soinside.com 2019 - 2024. All rights reserved.