如何使用KStream发送标头

问题描述 投票:0回答:1

我正在研究一个用例,我在其中创建了将数据从mongo发送到elasticsearch的管道。

Mongo - > Spring Boot - > Kafka - > Transformer(KStream) - > Kafka - > Consumer(发送到Elastic Search。)

我必须计算从Mongo到Elastic搜索记录所花费的时间。我想过使用Kafka Headers并继续在下一个Kafka Producer中转发它们的值,最后通过从当前时间戳中减去来计算它们。

我可以从制作人发送标题,但如何从Kafka Streams发送标题。在下面的代码中,我想发送我在消费inTopic时收到的标题并将它们发送到outTopic。

private StreamsBuilder buildStream(final String bootstrapServers, final String inTopic, final String outTopic) {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream(inTopic);

    kStream.filter(new Predicate<String, String>() {
        public boolean test(String s, String s2) {
            return true;
        }
    })
    .to(outTopic);
    return streamsBuilder;
}
apache-kafka apache-kafka-streams
1个回答
1
投票

您可以通过ProcessorContext访问处理器API中的标头,并一起使用DSL和处理器API。

我要做的是创建我的Transformer实现并在transform()中使用它。这个问题似乎与你的问题类似,它有一个代码片段的答案,我相信这可能会有所帮助:Set timestamp in output with Kafka Streams

© www.soinside.com 2019 - 2024. All rights reserved.