我对Kafka Streams很陌生,遇到了问题。
我有两个表-一个用于长期数据(descriptions),另一个用于实时数据(live)。他们有一个共同的id。
并且想法是存储来自descriptions的数据(大概存储在KTable中,保留每个id的最新描述),以及当新消息出现在live中时-与descriptions 放在相应的[[id上,然后进一步发送。
为简单起见,让我们将所有类型都设置为String。所以基本思想就像我所看过的每一篇教程:
interface Processor {
@Input("live")
KStream<String, String> input();
@Input("descriptions")
KTable<String, String> input();
@Output("output")
KStream<String, String> output();
}
然后:
@StreamListener @SendTo("output") public KStream<String, String> process( @Input("live") KStream<String, String> live, @Input("descriptions") KTable<String, String> descriptions) { // ... }
问题是descriptions主题不适用于KTable(空键,仅消息)。
因此,我不能将其用作输入,也不能创建任何新的中间主题来存储此表中的有效流(基本上是只读的。)>我正在搜索某种内存绑定目标,但无济于事。
我认为可能的方式类似于创建一个
intermediate
输出,该输出仅将KTable存储在内存中或其他东西,然后使用此intermediate作为live处理中的输入。喜欢:@StreamListener("descriptions") @SendTo("intermediate") public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) { // ... }
希望这种绑定语义是可能的。我是Kafka Streams的新手,遇到了问题。我有两个表-一个用于长期数据(描述),另一个用于实时数据(实时)。他们有一个共同的ID。想法是...