为什么额外的 log() 会影响用于计算 fromCallable() 的线程池?

问题描述 投票:0回答:1

我正在玩 Project Reactor 并面临不直观的行为。

举以下例子:

    Mono.fromCallable(() -> calculate())
    //.log()
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(i -> System.out.println("thread: " +   Thread.currentThread().getName()))
      .block();

当我取消注释

log()
时,
calculate()
main
线程上运行,但是当我将其留在其中时,该方法将在
boundedElastic
上执行。为什么
log()
会改变 Mono 的行为?

java project-reactor
1个回答
0
投票

如果您查看

publishOn
方法的实现,您会注意到它有一个特定的情况,即在
Callable
之后立即调用它。在这种情况下,它返回
MonoSubscribeOnCallable
并且可调用函数在传递给
publishOn
的调度程序上执行。这就是您看到这种行为的原因。

但是,如果您在

.log
fromCallable
之间添加
publishOn
运算符,则不适用这种特殊情况。这是因为
this
指针现在指向
MonoLogFuseable
,因此,
MonoPublishOn
以默认行为返回。

public final Mono<T> publishOn(Scheduler scheduler) {
    if(this instanceof Callable) {
        if (this instanceof Fuseable.ScalarCallable) {
            try {
                T value = block();
                return onAssembly(new MonoSubscribeOnValue<>(value, scheduler));
            }
            catch (Throwable t) {
                //leave MonoSubscribeOnCallable defer error
            }
        }
        @SuppressWarnings("unchecked")
        Callable<T> c = (Callable<T>)this;
        return onAssembly(new MonoSubscribeOnCallable<>(c, scheduler));
    }
    return onAssembly(new MonoPublishOn<>(this, scheduler));
}

这是引入此更改的提交:https://github.com/reactor/reactor-core/commit/41d9dae7256ffc36a07db7a9f5fa4d95182a5ad9

不幸的是,我找不到任何票证参考或解释来清楚地理解为什么它会这样工作。

© www.soinside.com 2019 - 2024. All rights reserved.