Spring StreamBridge 在处理流桥上的批量消息时内存不足

问题描述 投票:0回答:0

我正在使用流桥通过 pubsub 发送消息

我有一些 10 万条消息需要推送到 pubsub。我正在使用 5 个线程的执行程序池来完成这项工作

private void fetchAndPublish(List<MyObject> list) {
    int threads = 5 < list.size() ? 5 : list.size();
    ExecutorService es = Executors.newFixedThreadPool(threads);
    CompletableFuture<?>[] futures = list.stream()
        .map(s -> new MyRunnable(s))
        .map(task -> CompletableFuture.runAsync(task, es))
        .toArray(CompletableFuture[]::new);
    CompletableFuture.allOf(futures).join();
    es.shutdown();
  }
  
   public class MyRunnable implements Runnable {

    private final MyObject myObject;

    public MyRunnable(MyObject myObject) {
      this.myObject = myObject;
    }

    @Override
    public void run() {
        try {
          Message<MyObject> message = MessageBuilder.withPayload(myObject)
              .build();
          messagePublisherService.publish(message);
        } catch (Exception exception) {
          log.error("Failed ", exception);
        }
    }
  }
  

我发布消息的服务来了

@Component
@Slf4j
public class MessagePublisherService {

  @Value("${publish.destination:myDes-out-0}")
  private String destination;

  @Autowired
  private StreamBridge streamBridge;

  public void publish(Message<MyObject> message) {
    log.info("publishing {}", message.getPayload().getId());
    streamBridge.send(destination, message);
  }

}

效果很好,但一段时间后我遇到内存不足异常。我注意到的是,一旦数据被推送到 streamBridge

,内存就不会释放

我有 16GB 的堆内存,很快就被填满了。我的数据不是那么大。

Dynatrace 统计数据 我可以看到最初对 pubsubs 的发布调用花费了 200+ms。但是这个时间逐渐增加并占用了资源

这是我的 Dynatrace 慢慢地我们可以看到,在开始时发布到 pubsub 的请求花费了很少的时间

一段时间后开始占用大量资源

然后占用资源

spring-integration spring-cloud google-cloud-pubsub spring-cloud-stream spring-gcp
© www.soinside.com 2019 - 2024. All rights reserved.