KTable作为具有空键的主题的输入

问题描述 投票:0回答:1

我对Kafka Streams很陌生,遇到了问题。

我有两个表-一个用于长期数据(descriptions),另一个用于实时数据(live)。他们有一个共同的id

并且想法是存储来自descriptions的数据(大概存储在KTable中,保留每个id的最新描述),以及当新消息出现在live中时-与descriptions 放在相应的[[id上,然后进一步发送。

为简单起见,让我们将所有类型都设置为String。

所以基本思想就像我所看过的每一篇教程:

interface Processor { @Input("live") KStream<String, String> input(); @Input("descriptions") KTable<String, String> input(); @Output("output") KStream<String, String> output(); }

然后:

@StreamListener @SendTo("output") public KStream<String, String> process( @Input("live") KStream<String, String> live, @Input("descriptions") KTable<String, String> descriptions) { // ... }

问题是

descriptions主题不适用于KTable(空键,仅消息)。

因此,我不能将其用作输入,也不能创建任何新的中间主题来存储此表中的有效流(基本上是只读的。)>

我正在搜索某种内存绑定目标,但无济于事。

我认为可能的方式类似于创建一个

intermediate

输出,该输出仅将KTable存储在内存中或其他东西,然后使用此intermediate作为live处理中的输入。喜欢:
@StreamListener("descriptions") @SendTo("intermediate") public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) { // ... }
希望这种绑定语义是可能的。

我是Kafka Streams的新手,遇到了问题。我有两个表-一个用于长期数据(描述),另一个用于实时数据(实时)。他们有一个共同的ID。想法是...

spring-boot apache-kafka apache-kafka-streams spring-cloud-stream
1个回答
1
投票
我认为您可以尝试通过引入初始处理器来引入用于存储键/值的中间主题。然后,将该流用作常规处理器中输入的表。这是一些模板。我正在Spring Cloud Stream中使用新的功能模型来编写这些处理器。
© www.soinside.com 2019 - 2024. All rights reserved.