我正在使用流桥通过 pubsub 发送消息
我有一些 10 万条消息需要推送到 pubsub。我正在使用 5 个线程的执行程序池来完成这项工作
private void fetchAndPublish(List<MyObject> list) {
int threads = 5 < list.size() ? 5 : list.size();
ExecutorService es = Executors.newFixedThreadPool(threads);
CompletableFuture<?>[] futures = list.stream()
.map(s -> new MyRunnable(s))
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();
}
public class MyRunnable implements Runnable {
private final MyObject myObject;
public MyRunnable(MyObject myObject) {
this.myObject = myObject;
}
@Override
public void run() {
try {
Message<MyObject> message = MessageBuilder.withPayload(myObject)
.build();
messagePublisherService.publish(message);
} catch (Exception exception) {
log.error("Failed ", exception);
}
}
}
我发布消息的服务来了
@Component
@Slf4j
public class MessagePublisherService {
@Value("${publish.destination:myDes-out-0}")
private String destination;
@Autowired
private StreamBridge streamBridge;
public void publish(Message<MyObject> message) {
log.info("publishing {}", message.getPayload().getId());
streamBridge.send(destination, message);
}
}
效果很好,但一段时间后我遇到内存不足异常。我注意到的是,一旦数据被推送到 streamBridge
,内存就不会释放我有 16GB 的堆内存,很快就被填满了。我的数据不是那么大。
Dynatrace 统计数据 我可以看到最初对 pubsubs 的发布调用花费了 200+ms。但是这个时间逐渐增加并占用了资源
这是我的 Dynatrace 慢慢地我们可以看到,在开始时发布到 pubsub 的请求花费了很少的时间
一段时间后开始占用大量资源