我在我的服务中使用 Spring Cloud Gateway,并使用下面的 RequestDecorator 作为我的 LoggingFilter 中的包装器。
public class RequestDecorator extends ServerHttpRequestDecorator {
private final List<DataBuffer> dataBuffers = new ArrayList<>();
public RequestDecorator(ServerHttpRequest delegate) {
super(delegate);
super.getBody()
.map(
dataBuffer -> {
dataBuffers.add(dataBuffer);
return dataBuffer;
})
.subscribe();
}
@Override
public Flux<DataBuffer> getBody() {
return copy();
}
private Flux<DataBuffer> copy() {
return Flux.fromIterable(dataBuffers)
.map(dataBuffer -> dataBuffer.factory().wrap(dataBuffer.asByteBuffer()));
}
}
当 Jmeter 使用该服务进行性能测试时,我在日志中收到以下内存泄漏错误。
i.n.u.ResourceLeakDetector : - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:834)
在网上查了一些内容后,发现了以下评论—— “如果您使用 DataBuffer,您可能会遇到同样的错误。Spring 有 DataBufferUtils 库来释放资源。”
DataBufferUtils.release(dataBuffer);
但是我想知道如何在我的装饰器类中准确使用它,因为我在 LoggingFilter 中使用这个包装器。
有人可以建议吗?
你好兄弟,这是我的日志过滤器。我只记录请求对象,因为在我的项目中响应对象可以是 4 5mb,我不想在日志中看到它。
@Slf4j
@Component
public class LoggingFilter implements GlobalFilter, Ordered {
@Autowired private Tracer tracer;
private static final Set<String> LOGGABLE_CONTENT_TYPES =
new HashSet<>(
Arrays.asList(
MediaType.APPLICATION_JSON_VALUE.toLowerCase(),
MediaType.APPLICATION_JSON_UTF8_VALUE.toLowerCase(),
MediaType.TEXT_PLAIN_VALUE,
MediaType.TEXT_XML_VALUE));
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String traceId =
tracer.currentSpan() != null
? tracer.currentSpan().context().traceIdString()
: tracer.nextSpan().context().traceIdString();
ServerHttpRequest mutatedServerHttpRequest =
exchange.getRequest().mutate().header("x-b3-traceid", traceId).build();
var requestMutated =
new ServerHttpRequestDecorator(mutatedServerHttpRequest) {
@Override
public Flux<DataBuffer> getBody() {
var requestLogger = new Logger(getDelegate());
if (LOGGABLE_CONTENT_TYPES.contains(
String.valueOf(getHeaders().getContentType()).toLowerCase())) {
return super.getBody()
.map(
ds -> {
requestLogger.appendBody(ds.asByteBuffer());
return ds;
})
.doFinally((s) -> requestLogger.log())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} else {
requestLogger.log();
return super.getBody().doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
}
};
var responseMutated =
new ServerHttpResponseDecorator(exchange.getResponse()) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
//var responseLogger = new Logger(getDelegate());
if (LOGGABLE_CONTENT_TYPES.contains(
String.valueOf(getHeaders().getContentType()).toLowerCase())) {
return join(body)
.flatMap(
db -> {
//responseLogger.appendBody(db.asByteBuffer());
//responseLogger.log();
return getDelegate().writeWith(Mono.just(db));
}).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
} else {
//responseLogger.log();
return getDelegate().writeWith(body).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
}
};
return chain.filter(
exchange.mutate().request(requestMutated).response(responseMutated).build());
}
private Mono<? extends DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
return Flux.from(dataBuffers)
.collectList()
.filter((list) -> !list.isEmpty())
.map((list) -> list.get(0).factory().join(list))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@ToString
private class Logger {
private Map<String, String> headers;
private HttpStatus status;
private String path;
private String body;
Logger(ServerHttpResponse response) {
headers = response.getHeaders().toSingleValueMap();
status = HttpStatus.valueOf(response.getStatusCode().value());
}
Logger(ServerHttpRequest request) {
if (tracer.currentSpan() == null || tracer.currentSpan().context() == null) {
MDC.put("traceId", request.getHeaders().getFirst("x-b3-traceid"));
} else {
MDC.put("traceId", tracer.currentSpan().context().traceIdString());
}
headers = request.getHeaders().toSingleValueMap();
path = request.getMethod() + " " + request.getPath();
}
void appendBody(ByteBuffer byteBuffer) {
body = StandardCharsets.UTF_8.decode(byteBuffer).toString();
}
void log() {
log.info(this.toString());
}
}
}