我有下面的程序,它首先创建一个区间通量,取 5 个元素并订阅它。
发布后,我使用 replay 运算符和 auto connect 2 将其转换为可连接的通量,然后将其转换为热发布者。但是,无论我添加多少订阅者,任何后续订阅都不会收到任何数据。 (这里用sleep来表示热流的效果)
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
flux.subscribe();
flux = flux.replay().autoConnect(2).share(); //hot publisher
flux.subscribe(aLong -> System.out.println("first " + aLong)); //no data
sleep(2000);
flux.subscribe(aLong -> System.out.println("second " + aLong)); //no data
值得注意的是,如果我将自动连接设置为 1(有效地使其成为正常通量),则会观察到预期的行为(输出如下所示)。
输出:
第一个 0
第一个
第 2
第二个
前 3
第二个 3
前 4
第二个 4
如果我错了,请澄清或纠正我。
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
flux.subscribe(aLong -> System.out.println("source " + aLong));
flux = flux.replay().autoConnect(2).share(); //hot publisher
flux.subscribe(aLong -> System.out.println("first " + aLong));
sleep(2000);
flux.subscribe(aLong -> System.out.println("second " + aLong));
输出: 来源 0 来源 1 (这里只打印源代码,因为我们在订阅后调用 share())
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
flux.replay().autoConnect(3).share().
subscribe(aLong -> System.out.println("source " + aLong));
flux.subscribe(aLong -> System.out.println("first " + aLong));
sleep(2000);
flux.subscribe(aLong -> System.out.println("second " + aLong));
输出: 第一个 0 第一个 秒 0 第二个 1 (此处未打印来源)
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2).replay().autoConnect(3);
flux.share().subscribe(aLong -> System.out.println("source " + aLong));
flux.subscribe(aLong -> System.out.println("first " + aLong));
sleep(2000);
flux.subscribe(aLong -> System.out.println("second " + aLong));
输出: 来源 0 第一个 0 秒 0 来源 1 第一个 第二个 1
观察这 3 个场景及其输出,这很重要,在这里您何时何地订阅和共享