为什么在 Flux 发射完元素后,Reactor 项目中的 Schedulers.newParallel() 没有停止运行?

问题描述 投票:0回答:1

我有一个

Flux
的原始
String
,并在
main()
方法中运行此代码。

package com.example;
    
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;
    
import java.util.Arrays;
import java.util.List;
    
public class Parallel {
    
  private static final Logger log = Loggers.getLogger(Parallel.class.getName());

  private static List<String> COLORS = Arrays.asList("red", "white", "blue");

  public static void main(String[] args) throws InterruptedException {
    Flux<String> flux = Flux.fromIterable(COLORS);
    flux
      .log()
      .map(String::toUpperCase)
      .subscribeOn(Schedulers.newParallel("sub"))
      .publishOn(Schedulers.newParallel("pub", 1))
      .subscribe(value -> {
        log.info("==============Consumed: " + value);
      });
  }
}

如果您尝试运行此代码,该应用程序永远不会停止运行,您需要手动停止它。 如果我将

.newParallel()
替换为
.parallel()
一切都会按预期工作并且应用程序正常完成。

为什么它无法自行完成运行?为什么会挂? 这种行为的原因是什么?

如果您将此代码作为 JUnit 测试运行,它可以正常工作并且不会挂起。

java scheduler project-reactor flux
1个回答
5
投票
您使用

Scheduler

 工厂方法自行创建的 
newXxx
实例默认以 non-daemon 模式创建,这意味着它可以阻止 JVM 退出。

当所有测试都运行时,JUnit 会调用

System.exit()
,这解释了为什么测试场景不会挂起。

在这种情况下,

Schedulers.newSingle()
Schedulers.newParallel()
变体是最糟糕的“罪犯”,因为创建的线程在不活动超时后不会被剔除,这与
Schedulers.newBoundedElastic()
不同。

如果在现实场景中,您有一个明确定义的应用程序生命周期,您可以将

Scheduler
实例存储在某处(例如作为 beans),并确保每个
Scheduler#dispose()
在应用程序生命周期结束时被调用。

更简单的解决方案:使用相关工厂重载通过

Schedulers
显式创建
daemon == true

© www.soinside.com 2019 - 2024. All rights reserved.