我有一个紧凑的主题,约有30个Mio键。我的App
将这个主题具体化为KeyValueStore
。
如何检查KeyValueStore
是否完全填充?如果我通过InteractiveQuery
查找某个密钥,则需要知道该密钥是否不存在,因为StateStore
尚未准备好,或者该密钥确实不存在。
我通过这种方式实现StateStore:
@Bean
public Consumer<KTable<Key, Value>> process() {
return stream -> stream.filter((k, v) -> v != null,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
.withKeySerde(new KeySerde())
.withValueSerde(new ValueSerde()));
}
[仅当KafkaStreams的状态从REBALANCING
更改为RUNNING
时,才能从KafkaStreams实例获取KeyValueStore。您可以使用StreamsBuilderFactoryBeanCustomizer
检查此状态转换,以访问基础的KafkaStreams实例。如果您只想检查何时所有状态存储区都已完全填充,以及kafka流线程何时就绪,以便可以获取KeyValueStore
,则可以在StateListener
上进行监听:
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
// set flag that `stateStore` store of current KafkaStreams has been fully restore
// then you can get
}
}
}
或者如果您想从KafkaStreams
实例中获得商店
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
//get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//and set flag that `stateStore` store of current KafkaStreams has been fully restore
}
});
});
}
请注意,应该只有一个StreamsBuilderFactoryBeanCustomizer实例。