Spring Webclient .toFuture().get() 永远不会在另一个 Mono 中执行

问题描述 投票:0回答:1

有人能告诉我为什么 Mono.toFuture().get() 不在另一个 Mono 执行线程中执行吗?

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactiveTest {

  static MockServer server = new MockServer();

  public static void main(String[] args) throws InterruptedException, IOException {

    server.setUp();
    WebClient.create("http://localhost:1234/primary")
        .get()
        .exchangeToMono(x -> x.bodyToMono(String.class))
        .subscribe(
            response -> {
              System.out.println("First Mono Executing Thread: " + Thread.currentThread());
              try {
                CompletableFuture<String> res =
                    WebClient.create("http://localhost:1234/secondary")
                        .get()
                        .exchangeToMono(x -> x.bodyToMono(String.class))
                        .log()
                        .map(
                            x -> {
                              System.out.println("Second Mono Executing Thread: " + Thread.currentThread());
                              return x;
                            })
                        .toFuture();
                System.out.println("Second Mono with .toFuture().get() is never executed!: " + res.get());
              } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
              }
            });
    Thread.sleep(10000);
    server.tearDown();
    System.out.println("Main ended! But Second Mono with .toFuture().get() is never executed after waiting for 10 seconds!");
  }

  static class MockServer {
    final Dispatcher dispatcher =
        new Dispatcher() {

          @Override
          public MockResponse dispatch(RecordedRequest request) throws InterruptedException {

            switch (request.getPath()) {
              case "/primary":
                return new MockResponse().setResponseCode(200).setBody("{\"hello\":\"primary\"}");
              case "/secondary":
                return new MockResponse().setResponseCode(200).setBody("{\"hello\":\"secondary\"}");
            }
            return new MockResponse().setResponseCode(404);
          }
        };


    MockWebServer server;
    void setUp() {
      server = new MockWebServer();
      server.setDispatcher(dispatcher);
      try {
        server.start(1234);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }

    void tearDown() throws IOException {
      server.shutdown();
    }
  }

虽然我不太明白,但在 toFuture().get() 之前调用

Schedulers.parallel())
看起来可以工作并在单独的线程中执行。

            .subscribeOn(Schedulers.parallel())
            .toFuture().get();

示例:下面是在不同线程中工作

    Mono.just("red")
            .log()
            .map(String::toUpperCase)
            .subscribeOn(Schedulers.parallel())
            .toFuture().get();

而下面的代码在同一个主线程上工作

    Mono.just("red")
            .log()
            .map(String::toUpperCase)
            .toFuture().get();
java spring-webflux reactive-programming project-reactor spring-webclient
1个回答
0
投票

您的代码不需要阻塞反应器线程,因此没有理由使用

.get
.block
:

Mono<String> result = WebClient.create("http://localhost:1234/primary")
    .get()
    .exchangeToMono(response -> {
        return WebClient.create("http://localhost:1234/secondary")
            .get()
            .retrieve()               
            .toEntity(String.class)
            .map(response -> response.getBody());
    });
    
  
// this code is outside of reactor and you can stop and wait if you want
System.out.println("testResult" + result.block());
© www.soinside.com 2019 - 2024. All rights reserved.