Kafka Streams 开启 StateStore 日志循环

问题描述 投票:0回答:1

我有以下拓扑定义,环境中有两个应用程序实例:

    KStream<String, ObjectMessage> stream = kStreamBuilder.stream(inputTopic);
    stream.mapValues(new ProtobufObjectConverter())
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(100)))
            .aggregate(AggregatedObject::new, new ObjectAggregator(), buildStateStore(storeName))
            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withMaxRecords(config.suppressionBufferSize())))
            .mapValues(new AggregatedObjectProtobufConverter())
            .toStream((key, value) -> key.key())
            .to(outputTopic);

private Materialized<String, AggregatedObject, WindowStore<Bytes, byte[]>> buildStateStore(String storeName) {
    return Materialized.<String, AggregatedObject, WindowStore<Bytes, byte[]>>as(storeName)
            .withKeySerde(Serdes.String())
            .withValueSerde(new JsonSerde<>(AggregatedObject.class));
}

此拓扑是为

for
循环中的多个输入主题创建的,因此一旦应用程序实例具有多个拓扑。每个拓扑都有一个状态存储,由模式
KSTREAM-AGGREGATE-%s-STATE-STORE-0000000001
创建,如
Opening store KSTREAM-AGGREGATE-my.topic.name-STATE-STORE-0000000001
.

现在,直到最近我们没有配置

state-dir
目录并且因为我们使用了K8S状态集,所以存储在重启之间没有保留,所以据我所知应用程序必须重建状态
kafka-streams 
工作。

我们的日志充满了如下日志,但只是随着时间的变化(最后一个点后的后缀)。

INFO 1 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore Opening store KSTREAM-AGGREGATE-my.topic.name-STATE-STORE-0000000001.1675576920000 in regular mode

然而,以毫秒为单位的时间

1675576920000
是一天前的,有些甚至是3天前的。今天我给应用加了
state-dir
,但是还是一直显示这个日志。我们是应该等待一些时间直到一切都处理完还是我们做错了什么?

有人可以向我解释为什么

RocksDBTimestampedStore
记录这么多吗?此外,从这些商店记录的时间不是窗口操作定义的
100 ms
,为什么?

java apache-kafka apache-kafka-streams
1个回答
0
投票

如果使用窗口聚合,相应的存储实际上是多个 RocksDB(所谓的段),每个存储特定时间片的所有窗口。 IIRC,最小分段大小为 60 秒。

随着时间的推移,新的段存储被创建(导致您看到的日志行),而旧的段被清理。一般来说没什么好担心的。注意,这只是一个 INFO 日志。

© www.soinside.com 2019 - 2024. All rights reserved.