Spring Webclient 将 Mono 交给 CompletableFuture

问题描述 投票:0回答:1

我的问题与下面的代码非常相似,在另一个 webclient 调用中进行 webclient 调用。

 String r = Mono.just(Mono.just("Hello, world!").toFuture().get()).toFuture().get();

我们在反应式框架之上运行我们的代码。

我试图将 Mono 移交给 ForkJoinPool 以避免 WebTestClient 返回 IllegalStateException: block()/blockFirst()/blockLast() 正在阻塞,这在线程情况下不支持。

当只有外部 Mono 时,

.toFuture().get()
工作正常,但不适用于内部 Mono。
我的意思是请看一下这段代码

import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.util.concurrent.ExecutionException;


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        String result = WebClient.create("http://localhost:1234/primary")
                .get()
                .exchangeToMono(response -> {
                    System.out.println("Outer code is called");
                    try {
                        WebClient.create("http://localhost:1234/secondary") //localhost:1234/token")
                                .get()
                                .exchangeToMono(clientResponse -> {
                                    System.out.println("Inner code is not called ever! ");
                                    return clientResponse.bodyToMono(String.class);
                                })
                                .toFuture().get(); // This works fine when there is no inner code Mono
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println("This Outer code is never reached");
                    return response.bodyToMono(String.class);
                })
                .toFuture().get();
        System.out.println("testResult" + result);
    }

这里mono外面就是框架线程;现在调用我的代码(内部 Mono)。

当我使用线程转储调试代码时

  1. 我看到主线程和reactor-nio线程都在等待,并且从未完成。继续挂着。
  2. 在我看到没有 ForkJoinPool 本身创建之后。然后我添加了一些与ForkJoinPool(CompletableFuture)相关的代码来触发池。仍然相同,两条线都挂着。

我也没有看到任何僵局。 如果有人在这里阐明一些观点,那将会非常有帮助。


更新_1:

看起来 Mono.toFuture().get() 不会将工作移交给 ForkJoinPool

由于这段代码

Mono.just("Hello, world!").toFuture().get()
在我的调试中没有创建任何ForkJoinPool线程。

令人惊讶的是,它甚至没有使用任何其他线程,在同一个主线程中运行。


更新_2:

虽然我不太明白,但在 toFuture 之前调用

Schedulers.parallel())
看起来是在单独的线程中执行的。

            .subscribeOn(Schedulers.parallel())
            .toFuture().get();

示例:下面是在不同线程中工作

    Mono.just("red")
            .log()
            .map(String::toUpperCase)
            .subscribeOn(Schedulers.parallel())
            .toFuture().get();

而下面的代码在同一个主线程上工作

    Mono.just("red")
            .log()
            .map(String::toUpperCase)
            .toFuture().get();
java spring-webflux reactive-programming project-reactor spring-webclient
1个回答
0
投票

您的代码不需要阻塞反应器线程,因此没有理由使用

.get
.block
:

Mono<String> result = WebClient.create("http://localhost:1234/primary")
    .get()
    .exchangeToMono(response -> {
        return WebClient.create("http://localhost:1234/secondary")
            .get()
            .retrieve()
            // it seems you're interested only in the primary response?
            .then(response.bodyToMono(String.class))
    });
  
    // this code is outside of reactor and you can stop and wait if you want
    System.out.println("testResult" + result.block());
© www.soinside.com 2019 - 2024. All rights reserved.