我正在使用Kafka和Kafka Streams作为Spring Cloud Stream的一部分。我的Kafka Streams应用程序中流动的数据正在由特定时间窗口聚合和实现:
Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
oneHour.withLoggingEnabled(topicConfig);
events
.map(getStringSensorMeasurementKeyValueKeyValueMapper())
.groupByKey()
.windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
按照设计,实现的信息也由changelog主题支持。
我们的应用程序还有一个rest端点,它将像这样查询statestore:
ReadOnlyWindowStore<String, Double> windowStore = queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);
查看创建的changelog主题的设置,它将显示:
min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1
我认为当地的国营商店至少会保留61天(约2个月)的信息。然而,似乎只有大约最后一天的数据仍然存储在商店中。
什么可能导致数据很快被删除?
使用解决方案更新Kafka Streams版本2.0.1不包含Materialized.withRetention方法。对于此特定版本,我可以使用以下代码设置状态存储的保留时间,以解决我的问题:
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
使我的代码写成:
...
.groupByKey()
.windowedBy(timeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
(oneHour));
...
对于窗口化的KTable
s,存在本地保留时间,并且存在changlog保留时间。您可以通过Materialized.withRetentionTime(...)
设置本地存储保留时间 - 默认值为24h。
对于较旧的Kafka版本,本地商店保留时间通过
Windows#until()
设置。
如果创建了新应用程序,则会创建更改日志主题,其保留时间与本地存储保留时间相同。但是,如果手动增加日志保留时间,这不会影响您的商店保留时间,但您需要相应地更新代码。更改日志主题已存在时也是如此:如果更改本地存储保留时间,则更改日志主题配置不会自动更新。