Spring Reactor Merge 与 Concat

问题描述 投票:0回答:4

我正在使用 Spring Reactor,我看不出

concat
merge
运算符

之间有任何区别

这是我的例子

    @Test
    public void merge() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.merge(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }

    @Test
    public void concat() {
        Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        Flux<String> flux3 = Flux.just("world");
        Flux.concat(flux1, flux2, flux3)
                .map(String::toUpperCase)
                .subscribe(System.out::println);    
}

两者的行为完全相同。有人可以解释这两种操作之间的区别吗?

java spring flux reactor
4个回答
17
投票

合并和连接之间的本质区别在于,在合并中,两个流都是实时的。在连接的情况下,第一个流被终止,然后另一个流被连接到它。

连接


合并


13
投票

API 文档中已经提到了差异,虽然 concat 首先完全读取一个 Flux,然后将第二个 Flux 附加到该 Flux,但 merge 运算符不能保证两个 Flux 之间的顺序。

为了看到差异,请修改您的 merge() 代码,如下所示。

例如下面是示例代码

//Flux with Delay
@Test
public void merge() {
    Flux<String> flux1 = Flux.just("Hello", "Vikram");

    flux1 = Flux.interval(Duration.ofMillis(3000))
    .zipWith(flux1, (i, msg) -> msg);
    
    
    Flux<String> flux2 = Flux.just("reactive");
    flux2 = Flux.interval(Duration.ofMillis(2000))
            .zipWith(flux2, (i, msg) -> msg);
    
    Flux<String> flux3 = Flux.just("world");
    Flux.merge(flux1, flux2, flux3)
            .subscribe(System.out::println);

    try {
        Thread.sleep(8000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

当您修改 Flux.interval 持续时间(当前设置为 3000 毫秒)时,您将看到 merge() 的输出不断变化。但使用 concat() 时,输出将始终相同。


3
投票

另一个值得注意的区别是 concat 的所有变体仅在第一个流终止后才延迟订阅第二个流。

而合并的所有变体都热切地订阅发布者(所有发布者一起订阅)

运行以下代码突出显示了这一点:

//Lazy subscription of conact
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
                Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
                .subscribe(System.out::println, System.out::println);



//Eager subscription of the merge. Also, try mergeSequential.
Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
                Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
                .subscribe(System.out::println, System.out::println);

0
投票

如果有两个流: 第一出版商 =

Flux.just("a","b","c");
第二出版商 =
Flux.just("d","e","f");

concat() 将首先订阅第一个发布者,在订阅第一个发布者之后,它将把第二个发布者连接到第一个发布者。所以输出如下:

output : "a","b","c","d","e","f"

在 merge() 中,发布者急切地订阅,并且合并以交错的方式发生。所以输出如下:

output : "a","d","b","e","c","f"
© www.soinside.com 2019 - 2024. All rights reserved.