我有需要进行丰富的传入事件流,然后在它们到达时并行处理。
我当时认为Project Reactor是为了这份工作而订购的,但在我的测试中,所有的处理工作似乎都是连续进行的。
这是一些测试代码:
ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
.map(i-> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.take(3)
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");
我已经尝试取消注释每个注释行,但是在每种情况下输出都表明相同的线程用于处理所有事件
Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done
(唯一的区别是没有调度程序,它们在订阅线程上运行,而对于任何执行程序,它们都运行在同一个线程中,而不是订阅线程。)
我错过了什么?
仅供参考,有一种“睡眠”方法:
public static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("Exiting");
}
}
并行处理项目的一种方法是使用.parallel
/ .runOn
flux
.parallel(10)
.runOn(scheduler)
//
// Work to be performed in parallel goes here. (e.g. .map, .flatMap, etc)
//
// Then, if/when you're ready to go back to sequential, call .sequential()
.sequential()
阻塞操作(例如阻塞IO或Thread.sleep
)将阻塞执行它们的线程。反应流不能神奇地将阻塞方法转换为非阻塞方法。因此,您需要确保在适合阻塞操作的Scheduler
上运行阻塞方法(例如Schedulers.elastic()
)。
在上面的示例中,由于您知道要调用阻塞操作,因此可以使用.runOn(Schedulers.elastic())
。
根据用例,您还可以使用.flatMap
等异步运算符与.subscribeOn
或.publishOn
一起将特定的阻塞操作委托给另一个Scheduler
,如described in the project reactor docs。例如:
flux
.flatMap(i -> Mono.fromCallable(() -> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.subscribeOn(Schedulers.elastic()))
事实上,.flatMap
也有一个重载变量,它采用concurrency
参数,你可以限制飞行中内部序列的最大数量。在某些用例中,可以使用它代替.parallel
。然而,它通常不适用于Flux.interval
,因为Flux.interval
不支持补充比滴答更慢的下游请求。