我正在使用Spring react作为服务器来生成昂贵的代,并以Flux的方式逐一返回结果。如果取消了请求,这具有停止生成的优势(例如,由于约束而过于严格)。我的代码如下所示:
public Flux<Entity> generate(int nbrOfEntitiesToGenerate, Constaints constraints) {
return Flux.range(0, nbrOfEntitiesToGenerate)
.map(x -> Generator.expensiveGeneration(constraints)
// .subscribeOn(Schedulers.parallel())
;
}
这仅完成我想要的一半,被取消时我不会拨打下一个电话expensiveGeneration
,但不会停止当前运行的昂贵发电,如果约束太紧,该发电可能永远无法完成。我该怎么做。
额外的问题,如果您知道,我该如何在parralel中生成x个实体,以最大程度地利用线程(当然,不立即启动所有世代)。
提前感谢。
从Scheduler
创建ExecutorService
很简单,但是如果要取消,则需要保存可调用的Future<?>
。我更改了Generator
使其保留并包装了cancel
方法,该方法在Flux
处理doOnCancel
时被调用。
public class FluxPlay {
public static void main(String[] args) {
new FluxPlay().run();
}
private void run() {
Flux<LocalDateTime> f = generate(10);
Disposable d = f.subscribeOn(Schedulers.single()).subscribe(System.out::println);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
d.dispose();
}
private Flux<LocalDateTime> generate(int nbrOfEntitiesToGenerate) {
ExecutorService es = Executors.newSingleThreadExecutor();
Generator generator = new Generator(es);
return Flux.range(0, nbrOfEntitiesToGenerate)
.map(x -> generator.expensiveGeneration())
.doOnCancel(()->{
generator.cancel();
es.shutdown();
})
.publishOn(Schedulers.fromExecutor(generator::submit));
}
}
和:
public class Generator {
Future<?> f;
ExecutorService es;
public Generator(ExecutorService es) {
this.es = es;
}
public void submit(Runnable command) {
f = es.submit(command);
}
public void cancel() {
f.cancel(true);
es.shutdown();
}
public LocalDateTime expensiveGeneration() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
return LocalDateTime.now();
}
}