我正在通过Spring Webflux框架使用Reactor Netty,以便将数据发送到远程内容交付网络。客户端请求完成后,默认的Reactor Netty行为是保持连接活动并将其释放回基础连接池。
[某些内容分发网络建议在某些类型的状态码上重新解析DNS(例如500内部服务器错误)。为此,我添加了一个自定义Netty DnsNameResolver
和DnsCache
,但是我还需要关闭连接,否则它将被释放回池中,并且DNS将不会重新解析。
如何在错误状态代码上关闭连接?
到目前为止,通过在Reactor Netty的ConnectionObserver
中添加TcpClient
,我提出了以下解决方法:
TcpClient tcpClient = TcpClient.create()
.observe((connection, newState) -> {
if (newState == State.RELEASED && connection instanceof HttpClientResponse) {
HttpResponseStatus status = ((HttpClientResponse) connection).status();
if (status.codeClass() != HttpStatusClass.SUCCESS) {
connection.dispose();
}
}
});
即,如果连接已被释放(即放回到连接池中),并且释放是由HTTP客户端响应(状态代码不成功)引起的,则关闭连接。
这种方法感觉笨拙。如果在错误状态代码之后释放了连接,并且观察者正在关闭该连接,那么新请求可以并行获取同一连接吗?框架是在内部优雅地处理事情还是这是使上述方法无效的竞争条件?
谢谢您的帮助!
最好使用doOnResponse或doAfterResponseSuccess,这取决于使用情况,哪一个更合适。
但是等待释放已不成问题
如果连接在错误状态代码后被释放,并且观察者正在关闭该连接,那么新请求可以并行获取相同的连接吗?框架是在内部优雅地处理事情还是这是使上述方法无效的竞争条件?
默认情况下,连接池使用FIFO租赁策略运行,因此,如果池中有空闲连接,您将不会获得相同的连接,如果将连接池切换为LIFO租赁策略,则不会如此。获取时,将检查每个连接是否处于活动状态,并且仅提供一个活动连接供使用。
UPDATE:
您也可以尝试以下仅使用WebClient API而不使用Reactor Netty API的方法:
return this.webClient
.get()
.uri("/500")
.retrieve()
.onStatus(status -> status.equals(HttpStatus.INTERNAL_SERVER_ERROR), clientResponse -> {
clientResponse.bodyToFlux(DataBuffer.class)
.subscribe(new BaseSubscriber<DataBuffer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.cancel();
}
});
return Mono.error(new IllegalStateException("..."));
})
.bodyToMono(String.class);