我正在试图弄清楚如何将订阅多个主题的Kafka消费者的消息传递到基于主题的处理阶段(例如将它们保存到特定文件或数据库或其他任何内容)。
有一个Consumer.externalCommittableSource但它需要手动选择分区,这是我想要避免的。
Consumer.externalCommittableSource
通常,基于流元素的某些分组属性的值动态创建流和接收器的正确方法是什么?
好像你正在寻找groupBy运营商。
groupBy