如何在Spring Flux中停止昂贵的计算

问题描述 投票:0回答:1

我正在使用Spring react作为服务器来生成昂贵的代,并以Flux的方式逐一返回结果。如果取消了请求,这具有停止生成的优势(例如,由于约束而过于严格)。我的代码如下所示:

    public Flux<Entity> generate(int nbrOfEntitiesToGenerate, Constaints constraints) {
        return Flux.range(0, nbrOfEntitiesToGenerate)
            .map(x -> Generator.expensiveGeneration(constraints)
//            .subscribeOn(Schedulers.parallel())
            ;
    }

这仅完成我想要的一半,被取消时我不会拨打下一个电话expensiveGeneration,但不会停止当前运行的昂贵发电,如果约束太紧,该发电可能永远无法完成。我该怎么做。

额外的问题,如果您知道,我该如何在parralel中生成x个实体,以最大程度地利用线程(当然,不立即启动所有世代)。

提前感谢。

rx-java spring-webflux project-reactor spring-reactive
1个回答
0
投票

Scheduler创建ExecutorService很简单,但是如果要取消,则需要保存可调用的Future<?>。我更改了Generator使其保留并包装了cancel方法,该方法在Flux处理doOnCancel时被调用。

public class FluxPlay {
    public static void main(String[] args) {
        new FluxPlay().run();
    }
    private void run() {
        Flux<LocalDateTime> f = generate(10);
        Disposable d = f.subscribeOn(Schedulers.single()).subscribe(System.out::println);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        d.dispose();
    }

    private Flux<LocalDateTime> generate(int nbrOfEntitiesToGenerate) {
        ExecutorService es = Executors.newSingleThreadExecutor();
        Generator generator = new Generator(es);
        return Flux.range(0, nbrOfEntitiesToGenerate)
        .map(x -> generator.expensiveGeneration())
        .doOnCancel(()->{
            generator.cancel();
            es.shutdown();
        })
        .publishOn(Schedulers.fromExecutor(generator::submit));
    }
}

和:

public class Generator {
    Future<?> f;
    ExecutorService es;
    public Generator(ExecutorService es) {
        this.es = es;
    }
    public void submit(Runnable command) {
      f = es.submit(command);
    }
    public void cancel() {
        f.cancel(true);
        es.shutdown();
    }
    public LocalDateTime expensiveGeneration() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {}
        return LocalDateTime.now();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.