Flux .then() 在完成信号之前运行

问题描述 投票:0回答:3

我尝试用 Flux 流对象做一些事情,并在处理所有元素后做一些最后的工作并完成 Mono,但它不起作用:

 // data and id comming from a webrequest
 // myRepository is a org.springframework.data.r2dbc.repository.R2dbcRepository
 myRepository.findById(id)
    .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
    .then (doOnFinish(data))
    .subscribe();

Mono<DbObject> doSomethingWithDbObjectAndSave (DbObject dbo, DataObject data){
...
}
Mono<Void> doOnFinish(DataObject data){
...
}

问题: 即使我尝试这样做,函数“doOnFinish”也会在第一个元素传递 doSomethingWithDbObjectAndSave 之前被调用,但我更改了数据对象上的某些内容,并且希望在此之前执行此操作!

我尝试更改代码:

myRepository.findById(id)
        .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
        .last()
        .flatMap(dbObject  -> doOnFinish(data))
        .subscribe(); 

我希望我可以使用最后一个元素来触发 onFinish 函数,但我得到了 “flux#last() 没有观察到任何 onnext 信号”并且不要理解这一点!

有人有什么想法吗?

spring reactive-programming spring-webflux reactjs-flux flatmap
3个回答
1
投票

我认为 flatMap 在这里是多余的。相反,您可以将 doOnFinish 包装在 Mono.defer() 中:

 myRepository.findById(id)
    .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
    .then (Mono.defer(() -> doOnFinish(data)))
    .subscribe();

0
投票

then(methodCall(data))
将急切地计算参数表达式,从而在输入
methodCall
之前调用
then
。您需要一个延迟评估其参数的方法。

认为你正在寻找

doOnComplete
:

public final Flux<T> doOnComplete(Runnable onComplete)

添加 Flux 成功完成时触发的行为(副作用)。

myRepository.findById(id)
    .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
    .doOnComplete(() -> doOnFinish(data))
    .subscribe();

0
投票

哇......几个小时后我找到了解决方案。 doOnComplete 不会通过来自 doOnFinish (Mono) 的异步调用返回流。 现在我发现了这个:

myRepository.findById(id)
   .flatMap(dbObject -> doSomethingWithDbObjectAndSave(dbObject , data))
   .takeLast(1)
   .flatMap(dbObject  -> doOnFinish(data))
   .subscribe();

这对我有用......

© www.soinside.com 2019 - 2024. All rights reserved.