Project Reactor 中 doOnNext 的“即发即忘”操作

问题描述 投票:0回答:2

我有一个

Flux
流。对于处理的每个元素,我希望触发一个异步/非阻塞操作。例如,从数据库更新返回
Mono
的方法。 我希望此操作在
doOnNext
块上完成。 我不想影响
Flux
、那里实施的处理和背压。

假设要调用的

Mono
方法是

Mono<Integer> dbUpdate();

我的

Flux
应该是这样吗?

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data));
}

或者应该如堆栈溢出示例中提到的那样。

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data).subscribe());
}

以上不会导致内部阻塞问题吗

doOnNext

哪个调度程序最适合用于此类操作?

java spring reactive-programming spring-webflux project-reactor
2个回答
1
投票
如果您不订阅,

dbUpdate()
将被忽略。以下代码片段不会打印任何内容,因为
Mono.just("db update")
没有被订阅。

Mono<String> dbUpdate() {
    return Mono.just("db update")
        .doOnNext(System.out::println);
}

public Flux<String> processData() {
    return Flux.just("item 1", "item 2")
        .doOnNext(data -> dbUpdate());
}

请注意,

.subscribe()
不会阻塞您的线程,它会开始工作并立即返回。


0
投票

请注意,如果您想确保触发的

Mono
在单独的线程中运行,您应该使用
.subscribeOn(...)
而不仅仅是
.subscribe()

比较两个测试:

普通
subscribe()

@SneakyThrows
private void sleep(Duration duration) {
    Thread.sleep(duration);
}

@Test
void nestedMonoSubscribe() {
    Mono.just("Main")
            .doOnNext(s -> Mono.just("Nested")
                    .doOnNext(__ -> sleep(Duration.ofSeconds(1)))
                    .doOnNext(System.out::println)
                    .subscribe()) // uses the same thread, "Nested" is printed before "Main"
            .doOnNext(System.out::println)
            .subscribe();
    sleep(Duration.ofSeconds(2));
}

输出:

Nested
Main

subscribeOn(...)

@Test
void nestedMonoSubscribeOn() {
    Mono.just("Main")
            .doOnNext(s -> Mono.just("Nested")
                    .doOnNext(__ -> sleep(Duration.ofSeconds(1)))
                    .doOnNext(System.out::println)
                    .subscribeOn(Schedulers.boundedElastic())
                    .subscribe()) // uses another thread, "Nested" is printed after "Main"
            .doOnNext(System.out::println)
            .subscribe();
    sleep(Duration.ofSeconds(2));
}

输出:

Main
Nested

另请参阅

© www.soinside.com 2019 - 2024. All rights reserved.