我正在 Kubernetes 中运行 Kafka Streams 应用程序,该应用程序只需将主题直接读取到
GlobalKTable
中,然后提供 HTTP API 来获取键值对。这个 pod 的内存使用量对我来说出乎意料地高,我不明白为什么它使用这么多内存。
我已经根据
https://docs.confluence.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb配置了
BoundedMemoryRocksDBConfigSetter
大致
abstract class BoundedMemoryRocksDBConfig(blockCacheSize: Long = 50 * BoundedMemoryRocksDBConfig.Mi,
numPartitions: Int = 10,
ttl: Long = 180 * 24 * 60 * 60L)
extends RocksDBConfigSetter
with LazyLogging {
private val totalOffHeapMemory = numPartitions * blockCacheSize
private val totalMemTableMemory = totalOffHeapMemory
assert(totalMemTableMemory <= totalOffHeapMemory)
override def setConfig(storeName: String, options: Options, configs: util.Map[String, AnyRef]): Unit =
BoundedMemoryRocksDBConfig.synchronized {
if (!initialized) {
sharedCache = Some(new org.rocksdb.LRUCache(totalOffHeapMemory))
sharedWriteBufferManager = Some(new org.rocksdb.WriteBufferManager(totalOffHeapMemory, sharedCache.get))
initialized = true
}
val tableConfig = options.tableFormatConfig.asInstanceOf[BlockBasedTableConfig]
// These three options in combination will limit the memory used by RocksDB to the size passed
// to the block cache (totalOffHeapMemory)
tableConfig.setBlockCache(sharedCache.get)
tableConfig.setCacheIndexAndFilterBlocks(true)
options.setWriteBufferManager(sharedWriteBufferManager.get)
options.setTableFormatConfig(tableConfig)
val compactionsOptions = new CompactionOptionsFIFO()
options.setCompactionOptionsFIFO(compactionsOptions)
options.setTtl(ttl)
}
override def close(storeName: String, options: Options): Unit = {
// do not close cache or writeBufferManager because they are shared among all stream tasks
}
}
object BoundedMemoryRocksDBConfig {
private var initialized = false
private var sharedCache: Option[org.rocksdb.LRUCache] = None
private var sharedWriteBufferManager: Option[org.rocksdb.WriteBufferManager] = None
val Ki: Long = 1024L
val Mi: Long = Ki * Ki
}
主题中有 10 个分区,块缓存和内存表/写入缓冲区之间共享
LRUCache
50 MB。此外,Kafka Streams 整个拓扑有一个默认为 10 MB 的记录缓存,以及生产者/消费者缓冲区、TCP 发送/接收缓冲区和大小可以忽略不计的反序列化缓冲区。根据我们的指标,Pod 中的 JVM 有 200 MB 堆提交内存和另外 160 MB JVM 非堆内存使用量。因此,我预计 Pod 内存使用量最多为 10 * 50 MB 块缓存和写入缓冲区 + 10 MB 记录缓存 + 360 MB JVM 内存 = 870 MB,但 Pod 内存使用量远高于 2 GB 并且不断上升。
请注意,Kafka-Streams 收集的以下 RocksDB 指标通常是按 task_id 收集的(这里有时会聚合为 task_id 的总和)。我认为指标值是错误的,实际上应该是 1/10,因为有 10 个主题/任务具有共享
LRUCache
并且 RocksDB 不知道它是共享的,因此报告 500 MB 块缓存使用情况每个任务都是分开的,而实际上所有任务都有相同的 500 MB 内存缓冲区。
除非使用 Direct IO (https://github.com/facebook/rocksdb/wiki/Direct-IO),来自操作系统的共享文件缓存页面可以增加您的 RSS。如果您有一种跟踪匿名内存使用情况的方法,那么测量“内存使用情况”会更准确。
顺便说一句,RocksDB 通常不会直接使用 mmaped 文件缓存(仅通过其他函数),因为它通常必须在将数据接纳到 RocksDB 块缓存之前对数据进行解压缩和验证校验和。
取决于您访问 Globalktable 的方式。像 .all() 这样的方法很少,它是可自动关闭的,所以你应该在 - try with resources 中使用它。这只是一个可能导致内存泄漏的示例。