Java反应器-链Mono ,还有另一个异步任务产生Mono

问题描述 投票:0回答:1

我有以下异步任务:

public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}
public class AsyncSaveTask {
    // Returns Mono.error(new Exception()) if error, otherwise Mono of Object
    public Mono<Object> execute(Object o);
}

以及下面的服务类别:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   // Right now, the problem is that when validation completes successfully, it 
                   // emits Mono.empty hence the flatMap chained below will not be invoked at all.
                   .flatMap(dontcare -> this.save.execute(o))
    }
}

如上所示,如果flatMap成功完成,我尝试使用AsyncSaveTask.execute链接AsyncValidationTask.execute调用,它将无法工作,因为完成后没有任何内容发出(Mono.empty)。

我也考虑将then链接到第二个调用,但是无论第一个验证调用产生了Mono.error,它总是将调用链接的调用。

如何正确链接它们?

java reactive-programming reactor
1个回答
0
投票

.then仅用于终端源链接

使用.then,以便使用仅发送终端信号的过程执行。

此外,请注意,如果您需要对错误信号进行某些操作,则必须事先将.thenonErrorResume一起使用。

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(this.save.execute(o))
    }
}

[.defer为了推迟单声道创建

为了仅在成功验证的情况下执行this.save.execute(o),也必须将其包装在Mono.defer中:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(Mono.defer(() -> this.save.execute(o)))
    }
}

正确设计您的API

[通常,确保能正常工作的API并将该工作公开为Publisher(例如Mono | Flux)是一种很好的做法,然后,API创建者必须确保仅执行工作如果用户已订阅给定的Publisher实例。

例如,如果您的异步API在下面执行CompletableFuture创建,则值得将您的CompletableFuture创建手动包装到Mono.defer或使用适当的方法扩展,例如Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

© www.soinside.com 2019 - 2024. All rights reserved.