我想通过普通的Spring Beans查询Kafka Stream global story,例如
@Controller
。
我找到了两个例子:
自动装配 StreamsBuilderFactoryBean(或以其他方式从应用程序上下文中获取对它的引用)并调用 getKafkaStreams()。
factoryBean 是连接到控制器中的 StreamsBuilderFactoryBean 实例。这提供了由该工厂 bean 管理的 KafkaStreams 实例。因此,我们可以获得我们之前创建的 counts key/value state store
@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
);
return counts.get(word);
}
但我有一个疑问——在每个请求上一遍又一遍地调用 factoryBean.getKafkaStreams().store(...) 是否合理?一方面,我检查了
factoryBean.getKafkaStreams().store(...)
下的 Kafka 代码,但我没有做任何沉重的事情。另一方面,我检查了 Kafka 代码factoryBean.getKafkaStreams().store(...)
,它看起来太复杂了,不是公开全局存储的好方法