Kafka Streams与RocksDB的内存分配问题

问题描述 投票:1回答:1

我想做一个简单的Kafka流应用程序(v2.3.1最初是2.3.0),它可以收集指定时间间隔的统计数据(例如,每分钟即翻滚的窗口)。

events
  .groupByKey()
  .windowedBy(
    TimeWindows.of(Duration.ofMinutes(1).grace(Duration.ZERO))
    aggregate(...),Materialized.as("agg-metric")).withRetention(Duration.ofMinutes(5))

  .suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
  .toStream((key, value) -> key.key())

除了我的内存占用量不断增加外,一切似乎都很正常。我可以看到我的堆内存是稳定的,所以我认为这个问题与创建的RocksDB实例有关。

我有10个分区,由于每个窗口(每个分区默认为3个段)创建3个段,我希望总共有30个RocksDB实例。由于RocksDB的默认配置值对于我的应用来说比较大,所以我选择改变RocksDB的默认配置,并根据下面的代码实现RocksDBConfigSetter,本质上是想对离堆内存消耗进行限制。

private static final long BLOCK_CACHE_SIZE = 8 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final long WRITE_BUFFER_SIZE = 2 * 1024 * 1024L;
private static final int MAX_WRITE_BUFFERS = 2;

private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(BLOCK_CACHE_SIZE); // 8MB
private org.rocksdb.Filter filter = new org.rocksdb.BloomFilter();

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
     BlockBasedTableConfig tableConfig = new org.rocksdb.BlockBasedTableConfig();
    tableConfig.setBlockCache(cache) // 8 MB
    tableConfig.setBlockSize(BLOCK_SIZE); // 4 KB
    tableConfig.setCacheIndexAndFilterBlocks(true);;
    tableConfig.setPinTopLevelIndexAndFilter(true);

    tableConfig.setFilter(new org.rocksdb.BloomFilter())
    options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); // 2 memtables
    options.setWriteBufferSize(WRITE_BUFFER_SIZE);  // 2 MB
    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);

    options.setTableFormatConfig(tableConfig);
}

根据上面的配置值和 https:/docs.confluent.iocurrentstreamssizing.html 我预计分配给RocksDB的总内存将是

 (write_buffer_size_mb * write_buffer_count) + block_cache_size_mb => 2*2 + 8 => 12MB

因此,对于30个实例,分配的离堆内存总量将占到12*30=。360 MB.

当我尝试在一个有2G内存的虚拟机上运行这个应用时,我为kafka流应用的堆分配了512MB,所以根据我的逻辑理解,分配的总内存应该稳定在一个低于1GB(512+360)的值。

不幸的是,事实似乎并非如此,因为我的内存并没有停止增长,尽管在某一时刻后增长缓慢,但每天稳定地增长约2%,而且不可避免地在某一时刻消耗所有虚拟机的内存,并最终杀死该进程.更令人担忧的事实是,我从来没有看到任何堆外内存的释放,即使当我的流量变得非常低。

因此,我想知道,在这样一个简单而常见的用例中,我到底做错了什么。在计算我的应用程序的内存消耗时,我是否遗漏了什么?我是否可以在我的虚拟机上限制我的内存消耗,我需要在我的配置上改变什么设置来限制Kafka流应用程序和RocksDB实例的内存分配?

apache-kafka-streams rocksdb
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.