我有以下异步任务:
public class AsyncValidationTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
// Returns Mono.error(new Exception()) if error, otherwise Mono of Object
public Mono<Object> execute(Object o);
}
以及下面的服务类别:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
// Right now, the problem is that when validation completes successfully, it
// emits Mono.empty hence the flatMap chained below will not be invoked at all.
.flatMap(dontcare -> this.save.execute(o))
}
}
如上所示,如果flatMap
成功完成,我尝试使用AsyncSaveTask.execute
链接AsyncValidationTask.execute
调用,它将无法工作,因为完成后没有任何内容发出(Mono.empty)。
我也考虑将then
链接到第二个调用,但是无论第一个验证调用产生了Mono.error,它总是将调用链接的调用。
如何正确链接它们?
.then
仅用于终端源链接使用.then
,以便链使用仅发送终端信号的过程执行。
此外,请注意,如果您需要对错误信号进行某些操作,则必须事先将.then
与onErrorResume
一起使用。
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
为了推迟单声道创建为了仅在成功验证的情况下执行this.save.execute(o)
,也必须将其包装在Mono.defer
中:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
[通常,确保能正常工作的API并将该工作公开为Publisher
(例如Mono
| Flux
)是一种很好的做法,然后,API创建者必须确保仅执行工作如果用户已订阅给定的Publisher
实例。
例如,如果您的异步API在下面执行CompletableFuture
创建,则值得将您的CompletableFuture
创建手动包装到Mono.defer
或使用适当的方法扩展,例如Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)