使用 Kafka Streams 仅基于密钥过滤和转发 Kafka 消息

问题描述 投票:0回答:1

我想了解并可能改进我目前正在处理的一些 Kafka Streams 代码。现在我不完全确定反序列化和序列化过程的完整生命周期。

代码的核心思想相当简单:有一个 Kafka 主题,其中有大量消息传入,并且在入口处,消息被“标记”了密钥中的一些信息。对于这个问题,我们假设密钥以消息的“组”开头,例如“A 组”或“XX 组”。

现在应用程序动态创建 Kafka Streams 以将某些组过滤到新主题。

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// ...

var filteredGroup = "groupYYZ"; // hard-coded here, would come in from a parameter IRL
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("main-topic");

inputStream.filter((key, value) -> key != null && key.startsWith(filteredGroup))
           .to(filteredGroup+"-topic", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

我是否正确理解键和值都被反序列化(使用属性中的 serde 类),然后当新消息发送到输出主题时再次序列化?

如果是这样,因为我只使用密钥来过滤消息,有没有办法避免这种情况,只需复制值部分,或者我需要使用 DIY 消费者/生产者设置而不是高级设置来完成此操作流 API?

java apache-kafka apache-kafka-streams
1个回答
0
投票

您可以使用

ByteArraySerde
作为无操作 Serde 值。

© www.soinside.com 2019 - 2024. All rights reserved.