我正在使用 spring webflux 的网络客户端,如下所示:
WebClient.create()
.post()
.uri(url)
.syncBody(body)
.accept(MediaType.APPLICATION_JSON)
.headers(headers)
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(tClass));
运作良好。 我现在想要处理我正在调用的 Web 服务的错误(Ex 500 内部错误)。通常我会在“流”上添加一个 doOnError 并使用 Throwable 来测试状态代码,
但我的问题是我想获取网络服务提供的正文,因为它向我提供了一条我想使用的消息。
我希望无论发生什么情况都执行 flatMap 并测试自己的状态代码以反序列化或不反序列化主体。
我更喜欢使用ClientResponse提供的方法来处理http错误并抛出异常:
WebClient.create()
.post()
.uri( url )
.body( bodyObject == null ? null : BodyInserters.fromValue( bodyObject ) )
.accept( MediaType.APPLICATION_JSON )
.headers( headers )
.exchange()
.flatMap( clientResponse -> {
//Error handling
if ( clientResponse.statusCode().isError() ) { // or clientResponse.statusCode().value() >= 400
return clientResponse.createException().flatMap( Mono::error );
}
return clientResponse.bodyToMono( clazz )
} )
//You can do your checks: doOnError (..), onErrorReturn (..) ...
...
事实上,与DefaultWebClient的DefaultResponseSpec中处理错误的逻辑相同。 DefaultResponseSpec 是 ResponseSpec 的实现,如果我们使用检索()而不是交换(),我们将拥有它。
我们没有
onStatus()
吗?
public Mono<Void> cancel(SomeDTO requestDto) {
return webClient.post().uri(SOME_URL)
.body(fromObject(requestDto))
.header("API_KEY", properties.getApiKey())
.retrieve()
.onStatus(HttpStatus::isError, response -> {
logTraceResponse(log, response);
return Mono.error(new IllegalStateException(
String.format("Failed! %s", requestDto.getCartId())
));
})
.bodyToMono(Void.class)
.timeout(timeout);
}
并且:
public static void logTraceResponse(Logger log, ClientResponse response) {
if (log.isTraceEnabled()) {
log.trace("Response status: {}", response.statusCode());
log.trace("Response headers: {}", response.headers().asHttpHeaders());
response.bodyToMono(String.class)
.publishOn(Schedulers.elastic())
.subscribe(body -> log.trace("Response body: {}", body));
}
}
我通过这样做得到了错误主体:
webClient
...
.retrieve()
.onStatus(HttpStatus::isError, response -> response.bodyToMono(String.class) // error body as String or other class
.flatMap(error -> Mono.error(new RuntimeException(error)))) // throw a functional exception
.bodyToMono(MyResponseType.class)
.block();
你也可以这样做
return webClient.getWebClient()
.post()
.uri("/api/Card")
.body(BodyInserters.fromObject(cardObject))
.exchange()
.flatMap(clientResponse -> {
if (clientResponse.statusCode().is5xxServerError()) {
clientResponse.body((clientHttpResponse, context) -> {
return clientHttpResponse.getBody();
});
return clientResponse.bodyToMono(String.class);
}
else
return clientResponse.bodyToMono(String.class);
});
阅读这篇文章以获取更多示例链接,当我遇到类似的错误处理问题时,我发现它很有帮助
请注意,在撰写本文时,5xx 错误不再导致底层 Netty 层出现异常。请参阅https://github.com/spring-projects/spring-framework/commit/b0ab84657b712aac59951420f4e9d696c3d84ba2
我做了这样的事情:
Mono<ClientResponse> responseMono = requestSpec.exchange()
.doOnNext(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.is4xxClientError() || httpStatus.is5xxServerError()) {
throw new WebClientException(
"ClientResponse has erroneous status code: " + httpStatus.value() +
" " + httpStatus.getReasonPhrase());
}
});
然后:
responseMono.subscribe(v -> { }, ex -> processError(ex));
利用我学到的关于“使用 Reactor 抛出异常的正确方法”的非常棒的答案,我能够将这个答案放在一起。它使用
.onStatus
、.bodyToMono
和 .handle
将错误响应正文映射到异常。
// create a chicken
webClient
.post()
.uri(urlService.getUrl(customer) + "/chickens")
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(chickenCreateDto), ChickenCreateDto.class) // outbound request body
.retrieve()
.onStatus(HttpStatus::isError, clientResponse ->
clientResponse.bodyToMono(ChickenCreateErrorDto.class)
.handle((error, sink) ->
sink.error(new ChickenException(error))
)
)
.bodyToMono(ChickenResponse.class)
.subscribe(
this::recordSuccessfulCreationOfChicken, // accepts ChickenResponse
this::recordUnsuccessfulCreationOfChicken // accepts throwable (ChickenException)
);
我刚刚遇到了类似的情况,我发现 webClient 不会抛出任何异常,即使它得到 4xx/5xx 响应。就我而言,我使用 webclient 首先进行调用以获取响应,如果它返回 2xx 响应,那么我从响应中提取数据并将其用于进行第二次调用。如果第一次调用得到非 2xx 响应,则抛出异常。因为它没有抛出异常,所以当第一次调用失败而第二次调用仍然进行时。所以我所做的是
return webClient.post().uri("URI")
.header(HttpHeaders.CONTENT_TYPE, "XXXX")
.header(HttpHeaders.ACCEPT, "XXXX")
.header(HttpHeaders.AUTHORIZATION, "XXXX")
.body(BodyInserters.fromObject(BODY))
.exchange()
.doOnSuccess(response -> {
HttpStatus statusCode = response.statusCode();
if (statusCode.is4xxClientError()) {
throw new Exception(statusCode.toString());
}
if (statusCode.is5xxServerError()) {
throw new Exception(statusCode.toString());
}
)
.flatMap(response -> response.bodyToMono(ANY.class))
.map(response -> response.getSomething())
.flatMap(something -> callsSecondEndpoint(something));
}
我们终于明白发生了什么: 默认情况下,Netty 的 httpclient (HttpClientRequest) 配置为在服务器错误(响应 5XX)时失败,而不是在客户端错误(4XX)时失败,这就是它总是发出异常的原因。
我们所做的是扩展 AbstractClientHttpRequest 和 ClientHttpConnector 以配置 httpclient 的行为方式,当我们调用 WebClient 时,我们使用自定义的 ClientHttpConnector :
WebClient.builder().clientConnector(new CommonsReactorClientHttpConnector()).build();
WebClient 中的retrieve() 方法抛出WebClientResponseException 每当收到状态代码 4xx 或 5xx 的响应时。
您可以通过检查响应状态码来处理异常。
Mono<Object> result = webClient.get().uri(URL).exchange().log().flatMap(entity -> {
HttpStatus statusCode = entity.statusCode();
if (statusCode.is4xxClientError() || statusCode.is5xxServerError())
{
return Mono.error(new Exception(statusCode.toString()));
}
return Mono.just(entity);
}).flatMap(clientResponse -> clientResponse.bodyToMono(JSONObject.class))
参考:https://www.callicoder.com/spring-5-reactive-webclient-webtestclient-examples/
我偶然发现了这个,所以我想我不妨发布我的代码。
我所做的是创建一个全局处理程序,处理来自 Web 客户端的请求和响应错误。这是用 Kotlin 编写的,但当然可以轻松转换为 Java。这扩展了默认行为,因此您可以确保在客户处理之上获得所有自动配置。
正如您所看到的,这并没有真正执行任何自定义操作,它只是将 Web 客户端错误转换为相关响应。对于响应错误,代码和响应正文将简单地传递到客户端。对于当前的请求错误,它仅处理连接问题,因为这就是我关心的(目前),但正如您所看到的,它可以轻松扩展。
@Configuration
class WebExceptionConfig(private val serverProperties: ServerProperties) {
@Bean
@Order(-2)
fun errorWebExceptionHandler(
errorAttributes: ErrorAttributes,
resourceProperties: ResourceProperties,
webProperties: WebProperties,
viewResolvers: ObjectProvider<ViewResolver>,
serverCodecConfigurer: ServerCodecConfigurer,
applicationContext: ApplicationContext
): ErrorWebExceptionHandler? {
val exceptionHandler = CustomErrorWebExceptionHandler(
errorAttributes,
(if (resourceProperties.hasBeenCustomized()) resourceProperties else webProperties.resources) as WebProperties.Resources,
serverProperties.error,
applicationContext
)
exceptionHandler.setViewResolvers(viewResolvers.orderedStream().collect(Collectors.toList()))
exceptionHandler.setMessageWriters(serverCodecConfigurer.writers)
exceptionHandler.setMessageReaders(serverCodecConfigurer.readers)
return exceptionHandler
}
}
class CustomErrorWebExceptionHandler(
errorAttributes: ErrorAttributes,
resources: WebProperties.Resources,
errorProperties: ErrorProperties,
applicationContext: ApplicationContext
) : DefaultErrorWebExceptionHandler(errorAttributes, resources, errorProperties, applicationContext) {
override fun handle(exchange: ServerWebExchange, throwable: Throwable): Mono<Void> =
when (throwable) {
is WebClientRequestException -> handleWebClientRequestException(exchange, throwable)
is WebClientResponseException -> handleWebClientResponseException(exchange, throwable)
else -> super.handle(exchange, throwable)
}
private fun handleWebClientResponseException(exchange: ServerWebExchange, throwable: WebClientResponseException): Mono<Void> {
exchange.response.headers.add("Content-Type", "application/json")
exchange.response.statusCode = throwable.statusCode
val responseBodyBuffer = exchange
.response
.bufferFactory()
.wrap(throwable.responseBodyAsByteArray)
return exchange.response.writeWith(Mono.just(responseBodyBuffer))
}
private fun handleWebClientRequestException(exchange: ServerWebExchange, throwable: WebClientRequestException): Mono<Void> {
if (throwable.rootCause is ConnectException) {
exchange.response.headers.add("Content-Type", "application/json")
exchange.response.statusCode = HttpStatus.BAD_GATEWAY
val responseBodyBuffer = exchange
.response
.bufferFactory()
.wrap(ObjectMapper().writeValueAsBytes(customErrorWebException(exchange, HttpStatus.BAD_GATEWAY, throwable.message)))
return exchange.response.writeWith(Mono.just(responseBodyBuffer))
} else {
return super.handle(exchange, throwable)
}
}
private fun customErrorWebException(exchange: ServerWebExchange, status: HttpStatus, message: Any?) =
CustomErrorWebException(
Instant.now().toString(),
exchange.request.path.value(),
status.value(),
status.reasonPhrase,
message,
exchange.request.id
)
}
data class CustomErrorWebException(
val timestamp: String,
val path: String,
val status: Int,
val error: String,
val message: Any?,
val requestId: String,
)
实际上,您可以在
onError
调用中轻松记录正文:
.doOnError {
logger.warn { body(it) }
}
和:
private fun body(it: Throwable) =
if (it is WebClientResponseException) {
", body: ${it.responseBodyAsString}"
} else {
""
}
对于那些希望了解触发 500 内部系统错误的 WebClient 请求的详细信息的人,请重写 DefaultErrorWebExceptionHandler,如下所示。
Spring 默认会告诉您客户端出现错误,但它不提供 WebClient 调用的主体,这在调试中非常有用。
/**
* Extends the DefaultErrorWebExceptionHandler to log the response body from a failed WebClient
* response that results in a 500 Internal Server error.
*/
@Component
@Order(-2)
public class ExtendedErrorWebExceptionHandler extends DefaultErrorWebExceptionHandler {
private static final Log logger = HttpLogging.forLogName(ExtendedErrorWebExceptionHandler.class);
public FsErrorWebExceptionHandler(
ErrorAttributes errorAttributes,
Resources resources,
ServerProperties serverProperties,
ApplicationContext applicationContext,
ServerCodecConfigurer serverCodecConfigurer) {
super(errorAttributes, resources, serverProperties.getError(), applicationContext);
super.setMessageWriters(serverCodecConfigurer.getWriters());
super.setMessageReaders(serverCodecConfigurer.getReaders());
}
/**
* Override the default error log behavior to provide details for WebClientResponseException. This
* is so that administrators can better debug WebClient errors.
*
* @param request The request to the foundation service
* @param response The response to the foundation service
* @param throwable The error that occurred during processing the request
*/
@Override
protected void logError(ServerRequest request, ServerResponse response, Throwable throwable) {
// When the throwable is a WebClientResponseException, also log the body
if (HttpStatus.resolve(response.rawStatusCode()) != null
&& response.statusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR)
&& throwable instanceof WebClientResponseException) {
logger.error(
LogMessage.of(
() ->
String.format(
"%s 500 Server Error for %s\n%s",
request.exchange().getLogPrefix(),
formatRequest(request),
formatResponseError((WebClientResponseException) throwable))),
throwable);
} else {
super.logError(request, response, throwable);
}
}
private String formatRequest(ServerRequest request) {
String rawQuery = request.uri().getRawQuery();
String query = StringUtils.hasText(rawQuery) ? "?" + rawQuery : "";
return "HTTP " + request.methodName() + " \"" + request.path() + query + "\"";
}
private String formatResponseError(WebClientResponseException exception) {
return String.format(
"%-15s %s\n%-15s %s\n%-15s %d\n%-15s %s\n%-15s '%s'",
" Message:",
exception.getMessage(),
" Status:",
exception.getStatusText(),
" Status Code:",
exception.getRawStatusCode(),
" Headers:",
exception.getHeaders(),
" Body:",
exception.getResponseBodyAsString());
}
}
您必须将“Throwable e”参数强制转换为 WebClientResponseException,然后您可以调用 getResponseBodyAsString() :
WebClient webClient = WebClient.create("https://httpstat.us/404");
Mono<Object> monoObject = webClient.get().retrieve().bodyToMono(Object.class);
monoObject.doOnError(e -> {
if( e instanceof WebClientResponseException ){
System.out.println(
"ResponseBody = " +
((WebClientResponseException) e).getResponseBodyAsString()
);
}
}).subscribe();
// Display : ResponseBody = 404 Not Found
在已弃用
.exchange()
的较新版本中,您可以使用 exchangeToMono
来处理任何错误的状态代码并提供自定义值。
.exchangeToMono(
r -> {
if (r.statusCode().isError()) {
return Mono.just("Some string")
} else {
return r.bodyToMono(String.class);
}
}
);