我正在尝试使用WebClient
执行列表请求,然后过滤它们以找到第一个成功的列表请求(如果有)并返回。如果失败,则返回默认响应。
我面临的问题是,当我在.collectList()
上调用Flux<ServerResponse>
时,列表始终为空。基于我之前发出的请求数,我希望该列表包含N个ServerResponse
数。
public Mono<ServerResponse> retry(ServerRequest request) {
return Flux.fromIterable(request.headers().header(SEQUENCE_HEADER_NAME))
.map(URI::create)
// Build a "list" of responses
.flatMap(uri -> webClientBuilder.baseUrl(uri.toString()).build()
.method(Objects.requireNonNull(request.method()))
.headers(headers -> request.headers().asHttpHeaders().forEach((key, values) -> {
if (!SEQUENCE_HEADER_NAME.equals(key)) {
headers.addAll(key, values);
}
}))
.body(BodyInserters.fromDataBuffers(request.body(BodyExtractors.toDataBuffers())))
.exchange()
.flatMap(clientResponse -> ServerResponse.status(clientResponse.statusCode())
.headers(headers -> headers.addAll(clientResponse.headers().asHttpHeaders()))
.body(BodyInserters.fromDataBuffers(clientResponse.body(BodyExtractors.toDataBuffers()))))
)
// "Wait" for all of them to complete so we can filter
.collectList()
.flatMap(clientResponses -> {
List<ServerResponse> filteredResponses = clientResponses.stream()
.filter(response -> response.statusCode().is2xxSuccessful())
.collect(Collectors.toList());
if (filteredResponses.isEmpty()) {
log.error("No request succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
return ServerResponse.badRequest().build();
}
if (filteredResponses.size() > 1) {
log.error("Multiple requests succeeded; defaulting to {}", HttpStatus.BAD_REQUEST.toString());
return ServerResponse.badRequest().build();
}
return Mono.just(filteredResponses.get(0));
});
}
关于.collectList()
为什么总是返回空列表的任何想法?
[好吧,在我看来,您有一个困惑的要求,就是您希望First
Mono
能够响应,但是您正在尝试将该功能放入Flux
中,该功能旨在有效地处理流程中的所有项目。 Webflux中的Mono
旨在创建一个流程,该流程将对流程中的项目进行高效的一系列转换。对于成功的第一个URI,测试您的一堆URI并不需要WebFlux有什么用,所以我不得不质疑为什么要尝试将其强加到框架中。
[您可能会争辩说Flux
为您提供更好的异步处理,但是当一堆WebClient
调用时,我认为情况并非如此。 WebClient
仍然是HTTP,因此流程中的每个项目都在WebClient
附近停止并开始。如果要异步执行HTTP,则应使用ThreadPool
和Callable
。