我的问题与下面的代码非常相似,在另一个 webclient 调用中进行 webclient 调用。
String r = Mono.just(Mono.just("Hello, world!").toFuture().get()).toFuture().get();
我们在反应式框架之上运行我们的代码。
我试图将 Mono 移交给 ForkJoinPool 以避免 WebTestClient 返回 IllegalStateException: block()/blockFirst()/blockLast() 正在阻塞,这在线程情况下不支持。
.toFuture().get()
工作正常,但不适用于内部 Mono。 import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
public static void main(String[] args) throws ExecutionException, InterruptedException {
String result = WebClient.create("http://localhost:1234/primary")
.get()
.exchangeToMono(response -> {
System.out.println("Outer code is called");
try {
WebClient.create("http://localhost:1234/secondary") //localhost:1234/token")
.get()
.exchangeToMono(clientResponse -> {
System.out.println("Inner code is not called ever! ");
return clientResponse.bodyToMono(String.class);
})
.toFuture().get(); // This works fine when there is no inner code Mono
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("This Outer code is never reached");
return response.bodyToMono(String.class);
})
.toFuture().get();
System.out.println("testResult" + result);
}
这里mono外面就是框架线程;现在调用我的代码(内部 Mono)。
当我使用线程转储调试代码时
我也没有看到任何僵局。 如果有人在这里阐明一些观点,那将会非常有帮助。
更新_1:
看起来 Mono.toFuture().get() 不会将工作移交给 ForkJoinPool
由于这段代码
Mono.just("Hello, world!").toFuture().get()
在我的调试中没有创建任何ForkJoinPool线程。
令人惊讶的是,它甚至没有使用任何其他线程,在同一个主线程中运行。
更新_2:
虽然我不太明白,但在 toFuture 之前调用
Schedulers.parallel())
看起来是在单独的线程中执行的。
.subscribeOn(Schedulers.parallel())
.toFuture().get();
示例:下面是在不同线程中工作
Mono.just("red")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.toFuture().get();
而下面的代码在同一个主线程上工作
Mono.just("red")
.log()
.map(String::toUpperCase)
.toFuture().get();
您的代码不需要阻塞反应器线程,因此没有理由使用
.get
或 .block
:
Mono<String> result = WebClient.create("http://localhost:1234/primary")
.get()
.exchangeToMono(response -> {
return WebClient.create("http://localhost:1234/secondary")
.get()
.retrieve()
// it seems you're interested only in the primary response?
.then(response.bodyToMono(String.class))
});
// this code is outside of reactor and you can stop and wait if you want
System.out.println("testResult" + result.block());