我正在使用 Spring Reactor,我看不出
concat
和 merge
运算符 之间有任何区别
这是我的例子
@Test
public void merge() {
Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux3 = Flux.just("world");
Flux.merge(flux1, flux2, flux3)
.map(String::toUpperCase)
.subscribe(System.out::println);
}
@Test
public void concat() {
Flux<String> flux1 = Flux.just("hello").doOnNext(value -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux2 = Flux.just("reactive").doOnNext(value -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Flux<String> flux3 = Flux.just("world");
Flux.concat(flux1, flux2, flux3)
.map(String::toUpperCase)
.subscribe(System.out::println);
}
两者的行为完全相同。有人可以解释这两种操作之间的区别吗?
API 文档中已经提到了差异,虽然 concat 首先完全读取一个 Flux,然后将第二个 Flux 附加到该 Flux,但 merge 运算符不能保证两个 Flux 之间的顺序。
为了看到差异,请修改您的 merge() 代码,如下所示。
例如下面是示例代码
//Flux with Delay
@Test
public void merge() {
Flux<String> flux1 = Flux.just("Hello", "Vikram");
flux1 = Flux.interval(Duration.ofMillis(3000))
.zipWith(flux1, (i, msg) -> msg);
Flux<String> flux2 = Flux.just("reactive");
flux2 = Flux.interval(Duration.ofMillis(2000))
.zipWith(flux2, (i, msg) -> msg);
Flux<String> flux3 = Flux.just("world");
Flux.merge(flux1, flux2, flux3)
.subscribe(System.out::println);
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
当您修改 Flux.interval 持续时间(当前设置为 3000 毫秒)时,您将看到 merge() 的输出不断变化。但使用 concat() 时,输出将始终相同。
另一个值得注意的区别是 concat 的所有变体仅在第一个流终止后才延迟订阅第二个流。
而合并的所有变体都热切地订阅发布者(所有发布者一起订阅)
运行以下代码突出显示了这一点:
//Lazy subscription of conact
Flux.concat(Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(500)),
Flux.just(10, 20, 30, 40).delayElements(Duration.ofMillis(500)))
.subscribe(System.out::println, System.out::println);
//Eager subscription of the merge. Also, try mergeSequential.
Flux.merge(Flux.range(500, 3).delayElements(Duration.ofMillis(500)),
Flux.range(-500, 3).delayElements(Duration.ofMillis(300)))
.subscribe(System.out::println, System.out::println);
如果有两个流: 第一出版商 =
Flux.just("a","b","c");
第二出版商 = Flux.just("d","e","f");
concat() 将首先订阅第一个发布者,在订阅第一个发布者之后,它将把第二个发布者连接到第一个发布者。所以输出如下:
output : "a","b","c","d","e","f"
在 merge() 中,发布者急切地订阅,并且合并以交错的方式发生。所以输出如下:
output : "a","d","b","e","c","f"