有人能告诉我为什么 Mono.toFuture().get() 不在另一个 Mono 执行线程中执行吗?
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class ReactiveTest {
static MockServer server = new MockServer();
public static void main(String[] args) throws InterruptedException, IOException {
server.setUp();
WebClient.create("http://localhost:1234/primary")
.get()
.exchangeToMono(x -> x.bodyToMono(String.class))
.subscribe(
response -> {
System.out.println("First Mono Executing Thread: " + Thread.currentThread());
try {
CompletableFuture<String> res =
WebClient.create("http://localhost:1234/secondary")
.get()
.exchangeToMono(x -> x.bodyToMono(String.class))
.log()
.map(
x -> {
System.out.println("Second Mono Executing Thread: " + Thread.currentThread());
return x;
})
.toFuture();
System.out.println("Second Mono with .toFuture().get() is never executed!: " + res.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
Thread.sleep(10000);
server.tearDown();
System.out.println("Main ended! But Second Mono with .toFuture().get() is never executed after waiting for 10 seconds!");
}
static class MockServer {
final Dispatcher dispatcher =
new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
switch (request.getPath()) {
case "/primary":
return new MockResponse().setResponseCode(200).setBody("{\"hello\":\"primary\"}");
case "/secondary":
return new MockResponse().setResponseCode(200).setBody("{\"hello\":\"secondary\"}");
}
return new MockResponse().setResponseCode(404);
}
};
MockWebServer server;
void setUp() {
server = new MockWebServer();
server.setDispatcher(dispatcher);
try {
server.start(1234);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
void tearDown() throws IOException {
server.shutdown();
}
}
虽然我不太明白,但在 toFuture().get() 之前调用
Schedulers.parallel())
看起来可以工作并在单独的线程中执行。
.subscribeOn(Schedulers.parallel())
.toFuture().get();
示例:下面是在不同线程中工作
Mono.just("red")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.toFuture().get();
而下面的代码在同一个主线程上工作
Mono.just("red")
.log()
.map(String::toUpperCase)
.toFuture().get();
您的代码不需要阻塞反应器线程,因此没有理由使用
.get
或 .block
:
Mono<String> result = WebClient.create("http://localhost:1234/primary")
.get()
.exchangeToMono(response -> {
return WebClient.create("http://localhost:1234/secondary")
.get()
.retrieve()
.toEntity(String.class)
.map(response -> response.getBody());
});
// this code is outside of reactor and you can stop and wait if you want
System.out.println("testResult" + result.block());