我有一个
Flux
流。对于处理的每个元素,我希望触发一个异步/非阻塞操作。例如,从数据库更新返回 Mono
的方法。
我希望此操作在 doOnNext
块上完成。
我不想影响Flux
、那里实施的处理和背压。
假设要调用的
Mono
方法是
Mono<Integer> dbUpdate();
我的
Flux
应该是这样吗?
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data));
}
或者应该如堆栈溢出示例中提到的那样。
public Flux<Data> processData(PollRequest request)
{
return searchService.search(request)
.doOnNext(data -> dbUpdate(data).subscribe());
}
以上不会导致内部阻塞问题吗
doOnNext
?
哪个调度程序最适合用于此类操作?
dbUpdate()
将被忽略。以下代码片段不会打印任何内容,因为 Mono.just("db update")
没有被订阅。
Mono<String> dbUpdate() {
return Mono.just("db update")
.doOnNext(System.out::println);
}
public Flux<String> processData() {
return Flux.just("item 1", "item 2")
.doOnNext(data -> dbUpdate());
}
请注意,
.subscribe()
不会阻塞您的线程,它会开始工作并立即返回。
请注意,如果您想确保触发的
Mono
在单独的线程中运行,您应该使用 .subscribeOn(...)
而不仅仅是 .subscribe()
。
比较两个测试:
subscribe()
:@SneakyThrows
private void sleep(Duration duration) {
Thread.sleep(duration);
}
@Test
void nestedMonoSubscribe() {
Mono.just("Main")
.doOnNext(s -> Mono.just("Nested")
.doOnNext(__ -> sleep(Duration.ofSeconds(1)))
.doOnNext(System.out::println)
.subscribe()) // uses the same thread, "Nested" is printed before "Main"
.doOnNext(System.out::println)
.subscribe();
sleep(Duration.ofSeconds(2));
}
输出:
Nested
Main
subscribeOn(...)
:@Test
void nestedMonoSubscribeOn() {
Mono.just("Main")
.doOnNext(s -> Mono.just("Nested")
.doOnNext(__ -> sleep(Duration.ofSeconds(1)))
.doOnNext(System.out::println)
.subscribeOn(Schedulers.boundedElastic())
.subscribe()) // uses another thread, "Nested" is printed after "Main"
.doOnNext(System.out::println)
.subscribe();
sleep(Duration.ofSeconds(2));
}
输出:
Main
Nested
另请参阅