所以我有一个流写入全局状态存储并从中读取,并且在很多时候它最终是一致的
我已经尝试过他的顺序:
1.) 新消息到流
2.) 从全局存储中获取 key1
3.) key1 为空,向其写入新值(使用 key1 将新消息推送到支持主题)
4.)新消息马上到来
5.) 再次从全局存储中获取 key1 ...会期望它不为 3 中编写的 null。 但 key1 一次又一次地为空,新的 key1 被放入全局存储的支持主题
有没有办法在这里实现更严格,或者我刚刚接受kafka流中全局存储的最终一致性质?
我也尝试过使用 withCachingEnabled 和 withLoggingDisabled 如果这可能会影响性能,但同样的问题仍然存在。
我的全球商店设置
StoreBuilder<KeyValueStore<MappingKey, Long>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(MAIN_MAPPING_TOPIC.getGlobalStoreName()),
keySerde,
Serdes.Long());
builder.addGlobalStore(
keyValueStoreBuilder,
MAIN_MAPPING_TOPIC.getTopicName(),
Consumed.with(keySerde, Serdes.Long()),
() -> new GlobalStoreUpdater<>(MAIN_MAPPING_TOPIC.getGlobalStoreName()));`
尝试在您的应用程序中使用此设置:
"cache.max.bytes.buffering" : 0
这将确保您的应用程序不会在内存中保留任何缓存,并且它将尽快刷新到磁盘。