如何使用spring webClient完成所有http请求的等待时间?

问题描述 投票:1回答:1

我想为每个队列元素执行http请求。这些请求应并行调用。另外,我需要等待所有请求的终止。

我开发了以下代码:

 List<Mono<MyResponseDTO>> monoList = queue.stream()
                .map(jobStatusBunch -> webClient
                        .post()
                        .uri("localhost:8080/api/some/url")
                        .bodyValue(convertToRequestDto(someBean))
                        .retrieve()
                        .toEntity(String.class)
                        .filter(HttpEntity::hasBody)
                        .map(stringResponseEntity -> {
                            try {
                                return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
                            } catch (JsonProcessingException e) {
                                log.error("Can't parse", e);
                                return null;
                            }
                        })
                        .doOnNext(myResponseDTO -> {
                            log.info("doOnNext is invoked");
                        })
                ).collect(Collectors.toList());
          //await when all MONOs are completed

log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
        .flatMap(Function.identity())
        .then();
log.info("Finished waiting for {}", monoList);

并且当队列只有一个元素时,我会看到以下日志:

2019-11-19 19:17:17.733  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988  INFO 5896 --- [   scheduling-1] c.b.m.service.MyService     : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [   scheduling-1] o.s.w.r.f.client.ExchangeFunctions       : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230  INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService     : doOnNext is invoked

因此此代码不允许等待请求终止。

我该如何实现?

P.S。

看来Flux.merge(monoList).blockLast()是我需要的东西。它可以正常工作吗?

java resttemplate spring-webflux spring-webclient spring-reactor
1个回答
0
投票

您可以尝试以下操作:

Flux<MyResponseDTO> responses = queue.stream()
     .flatMap(jobStatusBunch -> webClient
                    .post()
                    .uri("localhost:8080/api/some/url")
                    .bodyValue(convertToRequestDto(someBean))
                    .retrieve()
                    .toEntity(MyResponseDTO.class));

 Mono<Void> workDone = response.then();

这很简单,应该做的。默认情况下(如果我没记错的话),订阅者将请求256元素,这意味着您将获得最多256个并行处理的HTTP请求。这可能取决于在HTTP客户端上配置的连接池。默认情况下,在Reactor Netty上,TCP通道的最大数量大于该数量。

[包括flatMap的各种Reactor运算符提供了带有concurrency方法参数的变量,以控制那里的最大并发性。

您的带有Flux.merge且列表为Mono的解决方案将是等效的。另一方面,使用Flux.concat并不是您要查找的内容,因为它会在请求元素时订阅Mono,因此您可能无法获得所需的最大并发性。

© www.soinside.com 2019 - 2024. All rights reserved.