Kafka Streams 内存使用率出乎意料地高

问题描述 投票:0回答:2

我正在 Kubernetes 中运行 Kafka Streams 应用程序,该应用程序只需将主题直接读取到

GlobalKTable
中,然后提供 HTTP API 来获取键值对。这个 pod 的内存使用量对我来说出乎意料地高,我不明白为什么它使用这么多内存。

我已经根据

https://docs.confluence.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb
配置了BoundedMemoryRocksDBConfigSetter

大致

abstract class BoundedMemoryRocksDBConfig(blockCacheSize: Long = 50 * BoundedMemoryRocksDBConfig.Mi,
                                          numPartitions: Int = 10,
                                          ttl: Long = 180 * 24 * 60 * 60L)
  extends RocksDBConfigSetter
    with LazyLogging {
 
  private val totalOffHeapMemory = numPartitions * blockCacheSize
  private val totalMemTableMemory = totalOffHeapMemory
  assert(totalMemTableMemory <= totalOffHeapMemory)

  override def setConfig(storeName: String, options: Options, configs: util.Map[String, AnyRef]): Unit =
    BoundedMemoryRocksDBConfig.synchronized {
      if (!initialized) {
        sharedCache = Some(new org.rocksdb.LRUCache(totalOffHeapMemory))
        sharedWriteBufferManager = Some(new org.rocksdb.WriteBufferManager(totalOffHeapMemory, sharedCache.get))
        initialized = true
      }

      val tableConfig = options.tableFormatConfig.asInstanceOf[BlockBasedTableConfig]
      // These three options in combination will limit the memory used by RocksDB to the size passed
      // to the block cache (totalOffHeapMemory)
      tableConfig.setBlockCache(sharedCache.get)
      tableConfig.setCacheIndexAndFilterBlocks(true)
      options.setWriteBufferManager(sharedWriteBufferManager.get)
      options.setTableFormatConfig(tableConfig)

      val compactionsOptions = new CompactionOptionsFIFO()
      options.setCompactionOptionsFIFO(compactionsOptions)
      options.setTtl(ttl)
    }

  override def close(storeName: String, options: Options): Unit = {
    // do not close cache or writeBufferManager because they are shared among all stream tasks
  }
}
object BoundedMemoryRocksDBConfig {
  private var initialized = false
  private var sharedCache: Option[org.rocksdb.LRUCache] = None
  private var sharedWriteBufferManager: Option[org.rocksdb.WriteBufferManager] = None

  val Ki: Long = 1024L
  val Mi: Long = Ki * Ki
}

主题中有 10 个分区,块缓存和内存表/写入缓冲区之间共享

LRUCache
50 MB。此外,Kafka Streams 整个拓扑有一个默认为 10 MB 的记录缓存,以及生产者/消费者缓冲区、TCP 发送/接收缓冲区和大小可以忽略不计的反序列化缓冲区。根据我们的指标,Pod 中的 JVM 有 200 MB 堆提交内存和另外 160 MB JVM 非堆内存使用量。因此,我预计 Pod 内存使用量最多为 10 * 50 MB 块缓存和写入缓冲区 + 10 MB 记录缓存 + 360 MB JVM 内存 = 870 MB,但 Pod 内存使用量远高于 2 GB 并且不断上升。

请注意,Kafka-Streams 收集的以下 RocksDB 指标通常是按 task_id 收集的(这里有时会聚合为 task_id 的总和)。我认为指标值是错误的,实际上应该是 1/10,因为有 10 个主题/任务具有共享

LRUCache
并且 RocksDB 不知道它是共享的,因此报告 500 MB 块缓存使用情况每个任务都是分开的,而实际上所有任务都有相同的 500 MB 内存缓冲区。

apache-kafka jvm apache-kafka-streams rocksdb
2个回答
0
投票

除非使用 Direct IO (https://github.com/facebook/rocksdb/wiki/Direct-IO),来自操作系统的共享文件缓存页面可以增加您的 RSS。如果您有一种跟踪匿名内存使用情况的方法,那么测量“内存使用情况”会更准确。

顺便说一句,RocksDB 通常不会直接使用 mmaped 文件缓存(仅通过其他函数),因为它通常必须在将数据接纳到 RocksDB 块缓存之前对数据进行解压缩和验证校验和。


0
投票

取决于您访问 Globalktable 的方式。像 .all() 这样的方法很少,它是可自动关闭的,所以你应该在 - try with resources 中使用它。这只是一个可能导致内存泄漏的示例。

© www.soinside.com 2019 - 2024. All rights reserved.