日志压缩主题不应将重复项与相同键相对。但是在我们的情况下,当发送具有相同键的新值时,不会删除前一个值。可能是什么问题?
val TestCompactState: KTable[String, TestCompact] = builder.table[String, TestCompact](kafkaStreamConfigs.getString("testcompact-source"),
(TestCompactmaterialized).withKeySerde(stringSerde).withValueSerde(TestCompactSerde))
我得到什么实际结果
Offsets Keys Messages
5 {"id":5} {"id":5,"namee":"omer","__deleted":"false"}
6 {"id":5} {"id":5,"namee":"d","__deleted":"false"}
我只想针对同一把钥匙最新记录预期结果
6 {"id":5} {"id":5,"namee":"d","__deleted":"false"}
据我所知,不可能应用日志压缩策略来使每个密钥精确地保留一条消息。即使您设置了cleanup.policy=compact
(主题级别)或log.cleanup.policy=compact
(全局级别),也不能保证仅保留最新消息,而压缩较旧的消息。
根据official Kafka documentation:
日志压缩为我们提供了更精细的保留机制,因此我们保证保留至少每个主键的最新更新