Mono.defer() 的真实情况是什么?

问题描述 投票:0回答:3

我知道

Mono.defer()
的作用,但我应该什么时候使用它?我知道其中一个用例是在返回
Mono
的函数中推迟一些阻塞副作用,但这通常是一种不好的做法(将副作用放在返回
Mono
Flux
的函数中)。当我想在
Mono
中包装一些阻塞代码时,有
Mono.fromCallable()
。那么使用
Mono.defer()
的最佳情况是什么?

java reactive-programming reactor reactive-streams
3个回答
0
投票

Mono.fromCallable()
用于定期延迟服务呼叫。
Mono.defer()
产生一个
Mono
并且只有当您 subscribe 时它才会这样做。因此,您可以决定不在组合阶段创建这个或那个
Mono
,而是恰好在目标订阅上创建。


0
投票

有时你需要懒惰地同步计算一些东西。

例如,我们有一个异步服务

public interface SomeService {

    Mono<User> getUserByName(String name);

    Mono<CreateUserResult> createUser(User user);

}

和一个端点,它在此服务中查找用户并在找不到用户时创建。但是在创建之前,必须在特殊线程池中同步执行一些繁重的计算。所以

someService.getUserByName(name)
                .switchIfEmpty(Mono.defer(() -> {
                    var someHeavyCalculatedData = localHeavyService.doSomethingNecessary(name);
                    return someService.createUser(new User(name, someHeavyCalculatedData));
                }).subscribeOn(specialScheduler));

当然,你可以说我们可以用

Mono.fromCallable
来代替

someService.getUserByName(name)
                .switchIfEmpty(Mono.fromCallable(() -> localHeavyService.doSomethingNecessary(name))
                        .subscribeOn(specialScheduler)
                        .flatMap(someHeavyCalculatedData -> someService.createUser(new User(name, someHeavyCalculatedData)))
                );

但是反应链中的每个方法调用都会导致在内存中创建大量对象。好吧,你打一个电话,可能不会注意到任何事情,但是随着负载的增加,频繁分配内存,然后由 GC 释放它会导致你的应用程序变慢。


0
投票

@a.khakh

一个对我有帮助的非常简单的用例是在另一个线程中执行阻塞代码,也就是说,处理那个阻塞调用,这样主线程就不会阻塞我。

我看到需要在另一个线程中使用 defer 来订阅,在本例中是在项目反应器提供并由 Simon Basle 编程的调度器中。

如果您将

defer
subscribeOn
Schedulers.boundedElastic
结合起来,您将使您的代码成为非阻塞的,或者更确切地说,它不会 block 您的应用程序,消除当时 Reactor 的默认调度程序中的争用,也许是调度程序.平行


下图是一个Combo的第三个组件,内部使用了一个defer加一个Scheduler,以免应用阻塞

  • 忽略称为
    ui.access()
    的方法它是vaadin框架的一部分。
Mono.defer(() -> this.reactiveRandomNumbers.monoFrecuency(event.getValue().getSize()))
                        .doOnEach(signal -> log.info("Thread name doOnNext(): {}", Thread.currentThread().getName()))
                        .subscribeOn(Schedulers.boundedElastic())
                        .subscribe(subscribeMap -> {
                            ui.access(() -> {
                                log.info("Thread name subscribe(): {}", Thread.currentThread().getName());
                                this.execute(event.getValue().getSize(), e -> subscribeMap);
                            });
                        });
© www.soinside.com 2019 - 2024. All rights reserved.