Spring Reactor:以非阻塞方式添加延迟

问题描述 投票:0回答:1

关于如何在方法中添加延迟但以非阻塞方式添加延迟的小问题。

模拟长流程的一种非常流行的方法是使用

Thread.sleep();
然而,对于Reactor项目来说,这是一个阻塞操作。 众所周知,在反应式项目中,我们不应该阻塞。

我想尝试和模拟长流程。某种方法将花费大量时间,但以非阻塞方式,无需交换线程。这是为了模拟一个非常冗长的方法,但被 BlockHound 等证明是非阻塞的。

这种结构非常受欢迎:

@Test
    public void simulateLengthyProcessingOperationReactor() {
        Flux.range(1,5000)
                .map(a -> simulateLengthyProcessingOperation(a))
                .subscribe(System.out::println);
    }

    public String simulateLengthyProcessingOperation(Integer input) {
        simulateDelayBLOCKING();
        return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
    }

    public void simulateDelayBLOCKING() {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

但它正在阻塞。 (我知道有

Mono.fromCallable(() ->
,但这不是问题)

是否可以做同样的事情,模拟延迟,但非阻塞? 另外,

.delay
不会达到预期的结果(在同一个反应式管道上模拟非阻塞的冗长方法)

@Test
    public void simulateLengthyProcessingOperationReactor() {
        Flux.range(1,5000)
                .map(a -> simulateLengthyProcessingOperation(a))
                .subscribe(System.out::println);
    }

    public String simulateLengthyProcessingOperation(Integer input) {
        simulateDelay_NON_blocking();
        return String.format("[%d] on thread [%s] at time [%s]", input, Thread.currentThread().getName(), new Date());
    }

    public void simulateDelay_NON_blocking() {
        //simulate lengthy process, but WITHOUT blocking
    }

谢谢你

java spring spring-webflux reactive-programming project-reactor
1个回答
5
投票

当然可以,有一系列方法

.delay...()

例如,您可以在此处阅读有关

delayElements()
方法的信息: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration-

你应该知道它会将执行线程切换到另一个

Scheduler
。 信号被延迟并在并行默认调度程序上继续。

在最简单的情况下,它看起来像这样:

public void simulateLengthyProcessingOperationReactor() {
    Flux.range(1,5000)
            .delayElements(Duration.ofMillis(1000L)) // delay each element for 1000 millis
            .subscribe(System.out::println);
}

根据您的情况,您可以这样编写代码:

@Test
public void simulateLengthyProcessingOperationReactor() {
    Flux.range(1,5000)
            .concatMap(this::simulateDelay_NON_blocking)
            .subscribe(System.out::println);
}

public Mono<String> simulateDelay_NON_blocking(Integer input) {
    //simulate lengthy process, but WITHOUT blocking
    return Mono.delay(Duration.ofMillis(1000L))
            .map(__ -> String.format("[%d] on thread [%s] at time [%s]",
                    input, Thread.currentThread().getName(), new Date()));
}
© www.soinside.com 2019 - 2024. All rights reserved.