我有以下拓扑定义,环境中有两个应用程序实例:
KStream<String, ObjectMessage> stream = kStreamBuilder.stream(inputTopic);
stream.mapValues(new ProtobufObjectConverter())
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(100)))
.aggregate(AggregatedObject::new, new ObjectAggregator(), buildStateStore(storeName))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withMaxRecords(config.suppressionBufferSize())))
.mapValues(new AggregatedObjectProtobufConverter())
.toStream((key, value) -> key.key())
.to(outputTopic);
private Materialized<String, AggregatedObject, WindowStore<Bytes, byte[]>> buildStateStore(String storeName) {
return Materialized.<String, AggregatedObject, WindowStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde<>(AggregatedObject.class));
}
此拓扑是为
for
循环中的多个输入主题创建的,因此一旦应用程序实例具有多个拓扑。每个拓扑都有一个状态存储,由模式 KSTREAM-AGGREGATE-%s-STATE-STORE-0000000001
创建,如 Opening store KSTREAM-AGGREGATE-my.topic.name-STATE-STORE-0000000001
.
现在,直到最近我们没有配置
state-dir
目录并且因为我们使用了K8S状态集,所以存储在重启之间没有保留,所以据我所知应用程序必须重建状态kafka-streams
工作。
我们的日志充满了如下日志,但只是随着时间的变化(最后一个点后的后缀)。
INFO 1 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store KSTREAM-AGGREGATE-my.topic.name-STATE-STORE-0000000001.1675576920000 in regular mode
然而,以毫秒为单位的时间
1675576920000
是一天前的,有些甚至是3天前的。今天我给应用加了state-dir
,但是还是一直显示这个日志。我们是应该等待一些时间直到一切都处理完还是我们做错了什么?
有人可以向我解释为什么
RocksDBTimestampedStore
记录这么多吗?此外,从这些商店记录的时间不是窗口操作定义的100 ms
,为什么?
如果使用窗口聚合,相应的存储实际上是多个 RocksDB(所谓的段),每个存储特定时间片的所有窗口。 IIRC,最小分段大小为 60 秒。
随着时间的推移,新的段存储被创建(导致您看到的日志行),而旧的段被清理。一般来说没什么好担心的。注意,这只是一个 INFO 日志。