GlobalKTable 作为带有 Kafka Streams Binder 的 QueryableStore

问题描述 投票:0回答:1

我正在使用

spring-boot-starter-parent
版本
3.1.2
spring-cloud-stream-binder-kafka-streams
版本
4.0.3

绝大多数在线示例都显示使用

@Input
注释创建 GlobalKTable,该注释已被弃用。我还没有找到如何使用最新版本的 Kafka Streams Binder 创建 GKT 作为 KeyValueStore 的工作示例。

我发现的最接近的是绕过 Binder 并使用 GKT 手动创建 KafkaStreams bean:

@Bean
public KafkaStreams kafkaStreams(
    @Value("${spring.cloud.stream.kafka.streams.binder.brokers}") String brokers) {
  StreamsBuilder builder = new StreamsBuilder();

  // Create a GlobalKTable
  builder.globalTable(
      Topics.GKT_TOPIC,
      Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as(
              Stores.MY_GKT_STORE_NAME)
          .withKeySerde(KeySerde)
          .withValueSerde(ValueSerde));

  // Setup stream processing logic here...

  Properties config = new Properties();
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
  KafkaStreams streams = new KafkaStreams(builder.build(), config);

  // Start the streams
  streams.start();

  return streams;
}

任何人都可以向我指出如何使用最新版本的 Kafka Streams Binder 执行此操作的文档或示例吗?

apache-kafka spring-kafka apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

我终于在文档中找到了如何做到这一点

如上所述,绑定器不提供将全局状态存储注册为功能的一流方法。为此,您需要使用定制器。以下是如何做到这一点。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}
© www.soinside.com 2019 - 2024. All rights reserved.