Mono.just() 在我的用例中当发出的元素挂起时仍然阻塞

问题描述 投票:0回答:1

我有两个测试使用 sleep() 模拟需要时间处理的 api 调用,并测试 Mono.just() 是否使其成为非阻塞。

在我的第一个测试中,我直接发出了一个字符串,但在 map() 中使其阻塞。

    @Test
    public void testMono() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            Mono.just("Hello World")
                    .map(s -> {
                        System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
                        try {
                            sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return s.length();
                    })
                    .map(integer -> {
                        try {
                            sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return integer.toString();
                    })
                    .subscribeOn(Schedulers.parallel())

                    .subscribe(System.out::println);

            System.out.println("Loop: " + i + " " + "after MONO");
        }

        sleep(10000);
    }

结果预计是非阻塞的,因为所有输出

System.out.println("Loop: " + i + " " + "after MONO");
同时出现。

但是,在第二个测试中,我将发出的元素从“Hello world”替换为阻塞的 getString() 方法。将 sleep() 放在 getString() 中的目的是,我想模拟需要时间来获取发出的元素的场景。 结果现在是阻塞的,因为输出在完成接收发射的元素后一个一个地显示出来。

    @Test
    public void testMono2() throws InterruptedException {

        for (int i = 0; i < 10; i++) {
            Mono.just(getString())
                    .map(s -> {
                        System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
                        try {
                            sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return s.length();
                    })
                    .map(integer -> {
                        try {
                            sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        return integer.toString();
                    })
                    .subscribeOn(Schedulers.parallel())

                    .subscribe(System.out::println);

            System.out.println("Loop: " + i + " " + "after MONO");
        }

        sleep(10000);
    }

    public String getString(){
        try {
            sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello world";
    }

在第三个测试中,它也是非阻塞的。

    int count = 0;

    @Test
    public void testFluxSink() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            System.out.println("Start Loop: " + i);
            int curLoop = count++;
            Flux.create(fluxSink -> {
                try {
                    System.out.println("Loop: " + curLoop + " " + " start sleeping");
                    sleep(5000);
                    System.out.println("Loop: " + curLoop + " " + " end sleeping");
                    fluxSink.next(onNext());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).subscribeOn(Schedulers.parallel()).subscribe(s -> {
                System.out.println(Thread.currentThread());
                System.out.println("Loop: " + curLoop + " " + "completed receiving" + " " + s);
            });
        }
        sleep(100000);
    }

    public String onNext() throws InterruptedException {
        sleep(5000);
        return "onNext";
    }

想知道我是不是对reactor的概念理解有误,想知道第二次测试怎么用错了?

java project-reactor nonblocking reactive-streams
1个回答
1
投票

首先,了解 Reactor 不会执行任何从阻塞调用到非阻塞调用的神奇转换是至关重要的。要制作一个端到端的非阻塞应用程序,您需要使用非阻塞驱动程序、NIO 等。如果您将阻塞代码包装到

Mono
Flux
,线程仍然会被阻塞。

至于你的第二个例子,在

getString
方法中睡眠。有两点需要注意:

第一个:你用

Mono.just
。重要的是它通常用于立即提供已知值。这意味着
getString
计算不是发生在反应器链的范围内,而是发生在反应器链组装阶段的主线程中。这就是您看到“顺序”行为的原因。如果将其替换为
Mono.fromCallable
,则计算将在反应器链(并行调度程序线程)中进行,您将看到与 1 或 3 示例中相同的行为。

第二个:重要的是要注意,将您的

getString
方法包装到
Mono.fromCallable
不会使您的代码成为非阻塞的。您的
sleep
仍然会停止并行调度程序的线程。在您的生产代码中,您的
getString
方法可能是一些数据库调用或其他服务调用,它们应该通过非阻塞驱动程序或基于 NIO 的库(如 Netty)完成。要模拟它,请使用
delayElement
而不是
sleep
,它以非阻塞方式工作。这是一个例子:

public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 3; i++) {
        getString()
            .map(s -> {
                System.out.println(Instant.now() + " Thread:" + Thread.currentThread().getName() + " " + s);
                return s.length();
            })
            .subscribeOn(Schedulers.parallel())
            .subscribe();
        System.out.println(Instant.now() + " Loop: " + i + " " + "after MONO");
    }

    TimeUnit.SECONDS.sleep(10);
}

public static Mono<String> getString() {
    return Mono.delay(Duration.ofSeconds(3))
        .map(it -> "hello world");
}

它打印

2023-05-17T20:31:12.099464Z Loop: 0 after MONO
2023-05-17T20:31:12.112953Z Loop: 1 after MONO
2023-05-17T20:31:12.113145Z Loop: 2 after MONO
2023-05-17T20:31:15.105750Z Thread:parallel-2 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-5 hello world
2023-05-17T20:31:15.117944Z Thread:parallel-6 hello world
© www.soinside.com 2019 - 2024. All rights reserved.