关于如何在方法中添加延迟但以非阻塞方式添加延迟的小问题。
模拟长流程的一种非常流行的方法是使用
Thread.sleep();
然而,对于Reactor项目来说,这是一个阻塞操作。
众所周知,在反应式项目中,我们不应该阻塞。
我想尝试和模拟长流程。某种方法将花费大量时间,但以非阻塞方式,无需交换线程。这是为了模拟一个非常冗长的方法,但被 BlockHound 等证明是非阻塞的。
这种结构非常受欢迎:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelayBLOCKING();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelayBLOCKING() {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
但它正在阻塞。 (我知道有
Mono.fromCallable(() ->
,但这不是问题)
是否可以做同样的事情,模拟延迟,但非阻塞? 另外,
.delay
不会达到预期的结果(在同一个反应式管道上模拟非阻塞的冗长方法)
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.map(a -> simulateLengthyProcessingOperation(a))
.subscribe(System.out::println);
}
public String simulateLengthyProcessingOperation(Integer input) {
simulateDelay_NON_blocking();
return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
}
public void simulateDelay_NON_blocking() {
//simulate lengthy process, but WITHOUT blocking
}
谢谢你
当然可以,有一系列方法
.delay...()
例如,您可以在此处阅读有关
delayElements()
方法的信息:
https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-
你应该知道它会将执行线程切换到另一个
Scheduler
。
信号被延迟并在并行默认调度程序上继续。
在最简单的情况下,它看起来像这样:
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.delayElements(Duration.ofMillis(1000L)) // delay each element for 1000 millis
.subscribe(System.out::println);
}
根据您的情况,您可以这样编写代码:
@Test
public void simulateLengthyProcessingOperationReactor() {
Flux.range(1,5000)
.concatMap(this::simulateDelay_NON_blocking)
.subscribe(System.out::println);
}
public Mono<String> simulateDelay_NON_blocking(Integer input) {
//simulate lengthy process, but WITHOUT blocking
return Mono.delay(Duration.ofMillis(1000L))
.map(__ -> String.format("[%d] on thread [%s] at time [%s]",
input, Thread.currentThread().getName(), new Date()));
}