我想为每个队列元素执行http请求。这些请求应并行调用。另外,我需要等待所有请求的终止。
我开发了以下代码:
List<Mono<MyResponseDTO>> monoList = queue.stream()
.map(jobStatusBunch -> webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(convertToRequestDto(someBean))
.retrieve()
.toEntity(String.class)
.filter(HttpEntity::hasBody)
.map(stringResponseEntity -> {
try {
return objectMapper.readValue(stringResponseEntity.getBody(), MyResponseDTO.class);
} catch (JsonProcessingException e) {
log.error("Can't parse", e);
return null;
}
})
.doOnNext(myResponseDTO -> {
log.info("doOnNext is invoked");
})
).collect(Collectors.toList());
//await when all MONOs are completed
log.info("Start waiting for {}", monoList);
Mono<Void> mono = Flux.fromIterable(monoList)
.flatMap(Function.identity())
.then();
log.info("Finished waiting for {}", monoList);
并且当队列只有一个元素时,我会看到以下日志:
2019-11-19 19:17:17.733 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Start waiting for [MonoPeek]
2019-11-19 19:17:25.988 INFO 5896 --- [ scheduling-1] c.b.m.service.MyService : Finished waiting for [MonoPeek]
2019-11-19 19:17:26.015 TRACE 5896 --- [ scheduling-1] o.s.w.r.f.client.ExchangeFunctions : [c42c1c2] HTTP POST localhost:8080/api/some/url, headers={}
2019-11-19 19:17:48.230 INFO 5896 --- [tor-http-nio-11] c.b.m.service.MyService : doOnNext is invoked
因此此代码不允许等待请求终止。
我该如何实现?
看来Flux.merge(monoList).blockLast()
是我需要的东西。它可以正常工作吗?
您可以尝试以下操作:
Flux<MyResponseDTO> responses = queue.stream()
.flatMap(jobStatusBunch -> webClient
.post()
.uri("localhost:8080/api/some/url")
.bodyValue(convertToRequestDto(someBean))
.retrieve()
.toEntity(MyResponseDTO.class));
Mono<Void> workDone = response.then();
这很简单,应该做的。默认情况下(如果我没记错的话),订阅者将请求256
元素,这意味着您将获得最多256个并行处理的HTTP请求。这可能取决于在HTTP客户端上配置的连接池。默认情况下,在Reactor Netty上,TCP通道的最大数量大于该数量。
[包括flatMap
的各种Reactor运算符提供了带有concurrency
方法参数的变量,以控制那里的最大并发性。
您的带有Flux.merge
且列表为Mono
的解决方案将是等效的。另一方面,使用Flux.concat
并不是您要查找的内容,因为它会在请求元素时订阅Mono
,因此您可能无法获得所需的最大并发性。