如何与Project Reactor并行调用两个或多个Web服务或REST并加入答案

问题描述 投票:2回答:3

您好我想知道如何在pararelo中调用两个或更多Web服务或Rest服务并撰写调用的响应。

我在网上找到了一些使用其他技术的例子,但我不能让它与反应堆一起工作

// start task A asynchronously
CompletableFuture<ResponseA> futureA = asyncServiceA.someMethod(someParam);
// start task B asynchronously
CompletableFuture<ResponseB> futureB = asyncServiceB.someMethod(someParam);

CompletableFuture<String> combinedFuture = futureA
        .thenCombine(futureB, (a, b) -> a.toString() + b.toString());

// wait till both A and B complete
String finalValue = combinedFuture.join();

////////////////////////////////////////////////////////////////////////////////

static void Run()
{
    //Follow steps at this link for addding a reference to the necessary .NET library:
    //http://stackoverflow.com/questions/9611316/system-net-http-missing-from-    
    //namespace-using-net-4-5

    //Create an HTTP Client
    var client = new HttpClient();

    //Call first service
    var task1 = client.GetAsync("http://www.cnn.com");

    //Call second service
    var task2 = client.GetAsync("http://www.google.com");

    //Create list of all returned async tasks
    var allTasks = new List<Task<HttpResponseMessage>> { task1, task2 };

    //Wait for all calls to return before proceeding
    Task.WaitAll(allTasks.ToArray());

}
java spring spring-boot project-reactor
3个回答
2
投票

让我们假设你需要点击2个服务,所以你需要2个基本的qazxsw poi(每个都配置了正确的基本URL,例如一个认证方案):

WebClient

从那里开始,假设您在控制器中注入了这两个Web客户端(作为限定bean)。这是使用Reactor对两者进行联合调用的代码:

@Bean
public WebClient serviceAClient(String authToken) {
    return WebClient.builder()
            .baseUrl("http://serviceA.com/api/v2/")
            .defaultHeader(HttpHeaders.AUTHORIZATION, "Basic " + authToken)
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}

@Bean
public WebClient serviceBClient(String authToken): WebClient {
    return WebClient.builder()
            .baseUrl("https://api.serviceB.com/")
            .defaultHeader(HttpHeaders.AUTHORIZATION, "token " + authToken)
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}

请注意,zip函数可以生成更有意义的内容,就像2个响应中的业务对象一样。生成的Mono<ResponseA> respA = webclientA.get() .uri("/sub/path/" + foo) .retrieve() .bodyToMono(ResponseA.class); Mono<ResponseB> respB = webclientB.get() .uri("/path/for/b") .retrieve() .bodyToMono(ResponseB.class); Mono<String> join = respA.zipWith(respB, (a, b) -> a.toString + b.toString); return join; 只会在订阅它的情况下触发2个请求(在Spring WebFlux的情况下,如果从控制器方法返回它,框架将执行此操作)。


1
投票

如果您使用的是Spring reactor,那么您需要的是运营商Zip,以便运行您的流程并将其压缩一次。

Mono<String>

你可以在这里看到有关反应技术的更多信息/** * Zip operator execute the N number of Flux independently, and once all them are finished, results * are combined in TupleN object. */ @Test public void zip() { Flux<String> flux1 = Flux.just("hello "); Flux<String> flux2 = Flux.just("reactive"); Flux<String> flux3 = Flux.just(" world"); Flux.zip(flux1, flux2, flux3) .map(tuple3 -> tuple3.getT1().concat(tuple3.getT2()).concat(tuple3.getT3())) .map(String::toUpperCase) .subscribe(value -> System.out.println("zip result:" + value)); }


0
投票

如果您已经有一个同步实现,您可以通过https://github.com/politrons/reactive方法轻松添加一些reactor功能,使其并行运行。

Mono.fromCallable()

Mono<ResponseA> responseA = Mono .fromCallable(() -> blockingserviceA.getResponseA()) .subscribeOn(Schedulers.elastic()); // will execute on a separate thread when called Mono<ResponseB> responseB = Mono .fromCallable(() -> blockingserviceB.getResponseB()) .subscribeOn(Schedulers.elastic()); // At that point nothing has been called yet, responseA and responseB are empty Mono AggregatedStuff aggregatedStuff = Mono.zip(responseA, responseB) // zip as many Mono as you want .flatMap(r -> doStuff(r.getT1(), r.getT2())) // do whatever needed with the results .block(); // execute the async calls, and then execute flatMap transformation fromCallable()之间需要注意的重要事项是,just()将直接执行并在主线程中执行,但just()是懒惰的,这意味着它只会在需要时执行,例如:当你调用block(),collect()时(对于Flux) ),......等......

fromCallable()

因此,请避免将Mono<ResponseA> responseA = Mono .just(blockingserviceA.getResponseA()) // execute directly Mono<ResponseB> responseB = Mono .just(blockingserviceB.getResponseB()) // execute directly // Above code have been executed sequentially, you can access response result // => yes it is kind of useless, and yes it is exactly how I did it the first time! 用于要并行运行的繁重任务。使用just()进行实例化是完全正确的,因为每次实例化String或任何其他对象时,您都不希望创建新线程并获得随之而来的开销。

PS:正如SimonBaslé指出的那样你可以使用WebClient直接返回Mono和Flux并进行异步调用,但是如果你已经实现了api客户端并且没有重构整个应用程序的选项,just()是一个简单的方法设置异步过程而不重构很多代码。

© www.soinside.com 2019 - 2024. All rights reserved.