如何在Project Reactor 3中将冷流转换为热流?

问题描述 投票:2回答:2

根据Mono和Flux的定义,两者都代表一个异步的数据序列,在订阅之前没有任何事情发生。

有两个广泛的出版商家庭:冷热。 Mono和Flux为每个订阅重新生成数据。如果未创建订阅,则永远不会生成数据。

另一方面,Hot发布商不依赖于任何数量的订阅者。

这是我的冷流代码:

        System.out.println("*********Calling coldStream************");
        Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
                .doOnNext(System.out::println)
                .filter(s -> s.startsWith("l"))
                .map(String::toUpperCase);

        source.subscribe(d -> System.out.println("Subscriber 1: "+d));
        source.subscribe(d -> System.out.println("Subscriber 2: "+d));
        System.out.println("-------------------------------------");

这是输出:

*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------

如何将上述冷流转换为热流?

reactive-programming publish-subscribe project-reactor reactive-streams
2个回答
3
投票

您可以通过在冷流上调用“发布”将冷流转换为热流,它将创建一个ConnectableFlux。因为它是一个热流,所以在你调用connect方法之前不会发生任何事情,即使你订阅了。试试这个例子:

   Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
            .doOnNext(System.out::println)
            .filter(s -> s.startsWith("l"))
            .map(String::toUpperCase);

    ConnectableFlux<String> connectable = source.publish();
    connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
    connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
    connectable.connect();

输出是:

ram sam dam lam订户1:LAM订户2:LAM

第二个例子:

 Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
            .doOnNext(System.out::println)
            .filter(s -> s.startsWith("l"))
            .map(String::toUpperCase);

    ConnectableFlux<String> connectable = source.publish();
    connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
    connectable.connect();
    connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));

输出是:

ram sam dam lam订户1:LAM

通过这两个示例,您可以看到数据从称为“连接”方法的那一刻开始流动


1
投票

热门发布商不依赖于任何数量的订阅者。他们可能会立即开始发布数据,并且每当新的订阅者进入时都会继续这样做(在这种情况下,所述订阅者只会看到订阅后发出的新元素)。 Reactor中的大多数其他热门发布者都扩展了Processor(本例中为UnicastProcessor)。

以下是我们如何实现这一目标

    System.out.println("*********Calling hotStream************");
    UnicastProcessor<String> hotSource = UnicastProcessor.create();

    Flux<String> hotFlux = hotSource.publish()
                                    .autoConnect()
                                    .map(String::toUpperCase);


    hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

    hotSource.onNext("ram");
    hotSource.onNext("sam");

    hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

    hotSource.onNext("dam");
    hotSource.onNext("lam");
    hotSource.onComplete();
    System.out.println("-------------------------------------");

以下是此输出:

*********Calling hotStream************
Subscriber 1 to Hot Source: RAM
Subscriber 1 to Hot Source: SAM
Subscriber 1 to Hot Source: DAM
Subscriber 2 to Hot Source: DAM
Subscriber 1 to Hot Source: LAM
Subscriber 2 to Hot Source: LAM
-------------------------------------

订户1捕获所有四种颜色。在生成前两种颜色之后创建的订户2仅捕获最后两种颜色。无论何时附加订阅,操作员在此Flux上描述的过程都会运行。

© www.soinside.com 2019 - 2024. All rights reserved.