我如何创建一个既是kafka消费者又是kafka生产者的组件?

问题描述 投票:0回答:1

我试图创建一个组件,它从一个主题消耗数据,处理数据并发送到另一个主题,也就是说,我需要让我的组件既是消费者又是生产者。

apache-kafka kafka-consumer-api apache-kafka-streams spring-kafka kafka-producer-api
1个回答
0
投票

听起来你要找的是Kafka Streams API.它是一个开源的Java API,用于在飞行中通过从一个主题读取事件,通过处理步骤运行并写入另一个主题,因此它既是生产者又是消费者。Kafka Streams文档 的例子。

为生产者和消费者设置不同的Serde配置。

 KStream<String, String> wordCountInputStream = streamsBuilder.stream("word-count-input", Consumed.with(Serdes.String(), Serdes.String()));
        KTable<String, Long> wordCounts = wordCountInputStream.mapValues(value -> value.toLowerCase()).....

        wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
© www.soinside.com 2019 - 2024. All rights reserved.