我试图了解如何在Spring Webflux集成中在出站网关中处理错误。
在春季集成中[[没有 webflux int-http:outbound-gateway具有error-handler如下:
<int-http:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
error-handler="accessErrorHandler"
header-mapper="headerMapper"
/>
但是在春季集成中[[withwebflux int-webflux:outbound-gateway没有错误处理程序
<int-webflux:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
header-mapper="headerMapper"
/>
这是我对pom.xml的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
ErrorHandler
。 Spring Integration WebFlux模块完全基于Spring WebFlux基础的非阻塞WebClient
。内部逻辑基于Project Reactor类型,例如Flux
和Mono
。为了遵守反应流规范,WebFluxRequestExecutingMessageHandler
仅返回Mono
进行响应。如果在与服务器交互期间出现一些错误,我们可以在此提供:
requestSpec.exchange()
.flatMap(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.isError()) {
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBuffer::write)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
throw new WebClientResponseException(
"ClientResponse has erroneous status code: "
+ httpStatus.value() + " "
+ httpStatus.getReasonPhrase(),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
);
}
else {
return Mono.just(response);
}
});
因此,某些WebClientResponseException
将被放入回复Mono
中。在任何反应式或非反应式下游中,将这样处理异常:
protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
Throwable result = ex;
if (!(ex instanceof MessagingException)) {
result = new MessageHandlingException(requestMessage, ex);
}
if (errorChannel == null) {
logger.error("Async exception received and no 'errorChannel' header exists and no default "
+ "'errorChannel' found", result);
}
else {
try {
sendOutput(new ErrorMessage(result), errorChannel, true);
}
catch (Exception e) {
Exception exceptionToLog =
IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
() -> "failed to send error message in the [" + this + ']', e);
logger.error("Failed to send async reply", exceptionToLog);
}
}
}
从请求消息的标题中提取errorChannel
,并回退到全局IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME
。
拥有所有这些,您应该订阅这样一个错误通道以处理这些错误WebClientResponseException
个实例。在Spring Framework文档中查看有关RestTemplate
的更多信息:https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access