是否可以重新使用GlobalKTable?

问题描述 投票:4回答:1

我想ReKey一个GlobalKTable(可能在初始化它时,因为我相信它们只在创建时才被读取)。

这可能吗?

我在Spring / Java Kafka Streams应用程序中有两个主题。第一个是没有压缩,第二个是。两者都使用Avro作为键和值。

该应用程序从第一个(非压缩)主题流式传输记录,并通过KStream#leftJoin附加来自压缩主题的其他数据。压缩的主题已经作为GlobalKTable被带入应用程序,通过StreamsBuilder#globalTable()创建并且需要保持这种方式(我需要来自应用程序的每个实例中可用主题的所有分区的每条记录)。

我知道有关于支持非主键连接(https://issues.apache.org/jira/browse/KAFKA-3705)的讨论,但据我所知,我还不能这样做......

@Configuration
@EnableKafkaStreams
public class StreamsConfig {

  @Autowired
  private MyCustomSerdes serdes;

  @Bean
  public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {

    GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
      serdes.getAvroKeyOne()
      serdes.getAvroValueOne()
    ));

    KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
      serdes.getAvroKeyTwo(),
      serdes.getAvroValueOne()
    ));

    kStream.join(
      globalTable,
       /**
        * the KeyValueMapper. I need to rekey the Global table as well to the
        * corresponding String (which it's data will have) if I want this join
        * to return results
        */
      (streamKey, streamValue) -> {return streamKey.getNewStringKey()},
      (/**ValueJoiner Deal**/)
    );
  }

}
java spring apache-kafka apache-kafka-streams spring-kafka
1个回答
2
投票

我想ReKey一个GlobalKTable(可能在初始化它时,因为我相信它们只在创建时才被读取)。

这可能吗?

今天没有直接的支持。您已经提到过即将开展的工作,例如添加support to global tables for non-primary-key joins,但目前还没有。

您今天可以做什么:您可以将原始Kafka主题重新键入(重新分区)为新主题,然后将重新键入的主题读入您的全局KTable。也许这是你的选择。

© www.soinside.com 2019 - 2024. All rights reserved.