如何解决Spring云网关内存泄漏

问题描述 投票:0回答:1

我在我的服务中使用 Spring Cloud Gateway,并使用下面的 RequestDecorator 作为我的 LoggingFilter 中的包装器。

public class RequestDecorator extends ServerHttpRequestDecorator {

  private final List<DataBuffer> dataBuffers = new ArrayList<>();

  public RequestDecorator(ServerHttpRequest delegate) {
    super(delegate);
    super.getBody()
        .map(
            dataBuffer -> {
              dataBuffers.add(dataBuffer);
              return dataBuffer;
            })
        .subscribe();
  }

  @Override
  public Flux<DataBuffer> getBody() {
    return copy();
  }

  private Flux<DataBuffer> copy() {
    return Flux.fromIterable(dataBuffers)
        .map(dataBuffer -> dataBuffer.factory().wrap(dataBuffer.asByteBuffer()));
  }
}

当 Jmeter 使用该服务进行性能测试时,我在日志中收到以下内存泄漏错误。

i.n.u.ResourceLeakDetector               :   - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
    io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
    io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:834)

在网上查了一些内容后,发现了以下评论—— “如果您使用 DataBuffer,您可能会遇到同样的错误。Spring 有 DataBufferUtils 库来释放资源。”

DataBufferUtils.release(dataBuffer);

但是我想知道如何在我的装饰器类中准确使用它,因为我在 LoggingFilter 中使用这个包装器。

有人可以建议吗?

spring-boot memory-leaks spring-cloud-gateway reactor-netty
1个回答
0
投票

你好兄弟,这是我的日志过滤器。我只记录请求对象,因为在我的项目中响应对象可以是 4 5mb,我不想在日志中看到它。

@Slf4j
@Component
public class LoggingFilter implements GlobalFilter, Ordered {

  @Autowired private Tracer tracer;
  private static final Set<String> LOGGABLE_CONTENT_TYPES =
      new HashSet<>(
          Arrays.asList(
              MediaType.APPLICATION_JSON_VALUE.toLowerCase(),
              MediaType.APPLICATION_JSON_UTF8_VALUE.toLowerCase(),
              MediaType.TEXT_PLAIN_VALUE,
              MediaType.TEXT_XML_VALUE));

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    String traceId =
        tracer.currentSpan() != null
            ? tracer.currentSpan().context().traceIdString()
            : tracer.nextSpan().context().traceIdString();
    ServerHttpRequest mutatedServerHttpRequest =
        exchange.getRequest().mutate().header("x-b3-traceid", traceId).build();
    var requestMutated =
        new ServerHttpRequestDecorator(mutatedServerHttpRequest) {
          @Override
          public Flux<DataBuffer> getBody() {
            var requestLogger = new Logger(getDelegate());
            if (LOGGABLE_CONTENT_TYPES.contains(
                String.valueOf(getHeaders().getContentType()).toLowerCase())) {
              return super.getBody()
                  .map(
                      ds -> {
                        requestLogger.appendBody(ds.asByteBuffer());
                        return ds;
                      })
                  .doFinally((s) -> requestLogger.log())
                      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            } else {
              requestLogger.log();
              return super.getBody().doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            }
          }
        };
    var responseMutated =
        new ServerHttpResponseDecorator(exchange.getResponse()) {
          @Override
          public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
            //var responseLogger = new Logger(getDelegate());
            if (LOGGABLE_CONTENT_TYPES.contains(
                String.valueOf(getHeaders().getContentType()).toLowerCase())) {
              return join(body)
                  .flatMap(
                      db -> {
                        //responseLogger.appendBody(db.asByteBuffer());
                        //responseLogger.log();
                        return getDelegate().writeWith(Mono.just(db));
                      }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            } else {
              //responseLogger.log();
              return getDelegate().writeWith(body).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
            }
          }
        };

    return chain.filter(
        exchange.mutate().request(requestMutated).response(responseMutated).build());
  }

  private Mono<? extends DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
    Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
    return Flux.from(dataBuffers)
        .collectList()
        .filter((list) -> !list.isEmpty())
        .map((list) -> list.get(0).factory().join(list))
        .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
  }

  @Override
  public int getOrder() {
    return Ordered.HIGHEST_PRECEDENCE;
  }

  @ToString
  private class Logger {
    private Map<String, String> headers;
    private HttpStatus status;
    private String path;
    private String body;

    Logger(ServerHttpResponse response) {
      headers = response.getHeaders().toSingleValueMap();
      status = HttpStatus.valueOf(response.getStatusCode().value());
    }

    Logger(ServerHttpRequest request) {
      if (tracer.currentSpan() == null || tracer.currentSpan().context() == null) {
        MDC.put("traceId", request.getHeaders().getFirst("x-b3-traceid"));
      } else {
        MDC.put("traceId", tracer.currentSpan().context().traceIdString());
      }
      headers = request.getHeaders().toSingleValueMap();
      path = request.getMethod() + " " + request.getPath();
    }

    void appendBody(ByteBuffer byteBuffer) {
      body = StandardCharsets.UTF_8.decode(byteBuffer).toString();
    }

    void log() {
      log.info(this.toString());
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.