检查StateStore是否已完全填充

问题描述 投票:0回答:1

我有一个紧凑的主题,约有30个Mio键。我的App将这个主题具体化为KeyValueStore

如何检查KeyValueStore是否完全填充?如果我通过InteractiveQuery查找某个密钥,则需要知道该密钥是否不存在,因为StateStore尚未准备好,或者该密钥确实不存在。

我通过这种方式实现StateStore:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }
apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

[仅当KafkaStreams的状态从REBALANCING更改为RUNNING时,才能从KafkaStreams实例获取KeyValueStore。您可以使用StreamsBuilderFactoryBeanCustomizer检查此状态转换,以访问基础的KafkaStreams实例。如果您只想检查何时所有状态存储区都已完全填充,以及kafka流线程何时就绪,以便可以获取KeyValueStore,则可以在StateListener上进行监听:

@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
    return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
        if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
            // set flag that `stateStore` store of current KafkaStreams has been fully restore
            // then you can get
        }
    }
}

或者如果您想从KafkaStreams实例中获得商店

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                //get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
                //and set flag that `stateStore` store of current KafkaStreams has been fully restore
            }
        });
    });
}

Read more in the docs

请注意,应该只有一个StreamsBuilderFactoryBeanCustomizer实例。

© www.soinside.com 2019 - 2024. All rights reserved.