我试图创建一个组件,它从一个主题消耗数据,处理数据并发送到另一个主题,也就是说,我需要让我的组件既是消费者又是生产者。
听起来你要找的是Kafka Streams API.它是一个开源的Java API,用于在飞行中通过从一个主题读取事件,通过处理步骤运行并写入另一个主题,因此它既是生产者又是消费者。Kafka Streams文档 的例子。
为生产者和消费者设置不同的Serde配置。
KStream<String, String> wordCountInputStream = streamsBuilder.stream("word-count-input", Consumed.with(Serdes.String(), Serdes.String()));
KTable<String, Long> wordCounts = wordCountInputStream.mapValues(value -> value.toLowerCase()).....
wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));