通过共享运算符将可连接的 Flux 转换为 Hot

问题描述 投票:0回答:1

我有下面的程序,它首先创建一个区间通量,取 5 个元素并订阅它。
发布后,我使用 replay 运算符和 auto connect 2 将其转换为可连接的通量,然后将其转换为热发布者。但是,无论我添加多少订阅者,任何后续订阅都不会收到任何数据。 (这里用sleep来表示热流的效果)

        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
        flux.subscribe();
        flux = flux.replay().autoConnect(2).share();   //hot publisher     
        flux.subscribe(aLong -> System.out.println("first " + aLong));   //no data
        sleep(2000);
        flux.subscribe(aLong -> System.out.println("second " + aLong));  //no data

值得注意的是,如果我将自动连接设置为 1(有效地使其成为正常通量),则会观察到预期的行为(输出如下所示)。

输出:
第一个 0
第一个
第 2
第二个
前 3
第二个 3
前 4
第二个 4

如果我错了,请澄清或纠正我。

java project-reactor reactive-streams
1个回答
0
投票
 Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
    flux.subscribe(aLong -> System.out.println("source " + aLong));
    flux = flux.replay().autoConnect(2).share();   //hot publisher
    flux.subscribe(aLong -> System.out.println("first " + aLong));   
    sleep(2000);
    flux.subscribe(aLong -> System.out.println("second " + aLong));

输出: 来源 0 来源 1 (这里只打印源代码,因为我们在订阅后调用 share())

 Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2);
    flux.replay().autoConnect(3).share().
        subscribe(aLong -> System.out.println("source " + aLong));   
    flux.subscribe(aLong -> System.out.println("first " + aLong));
    sleep(2000);
    flux.subscribe(aLong -> System.out.println("second " + aLong));

输出: 第一个 0 第一个 秒 0 第二个 1 (此处未打印来源)

Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(2).replay().autoConnect(3);
    flux.share().subscribe(aLong -> System.out.println("source " + aLong));  
    flux.subscribe(aLong -> System.out.println("first " + aLong));
    sleep(2000);
    flux.subscribe(aLong -> System.out.println("second " + aLong));

输出: 来源 0 第一个 0 秒 0 来源 1 第一个 第二个 1

观察这 3 个场景及其输出,这很重要,在这里您何时何地订阅和共享

© www.soinside.com 2019 - 2024. All rights reserved.