我想了解并可能改进我目前正在处理的一些 Kafka Streams 代码。现在我不完全确定反序列化和序列化过程的完整生命周期。
代码的核心思想相当简单:有一个 Kafka 主题,其中有大量消息传入,并且在入口处,消息被“标记”了密钥中的一些信息。对于这个问题,我们假设密钥以消息的“组”开头,例如“A 组”或“XX 组”。
现在应用程序动态创建 Kafka Streams 以将某些组过滤到新主题。
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// ...
var filteredGroup = "groupYYZ"; // hard-coded here, would come in from a parameter IRL
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("main-topic");
inputStream.filter((key, value) -> key != null && key.startsWith(filteredGroup))
.to(filteredGroup+"-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
我是否正确理解键和值都被反序列化(使用属性中的 serde 类),然后当新消息发送到输出主题时再次序列化?
如果是这样,因为我只使用密钥来过滤消息,有没有办法避免这种情况,只需复制值部分,或者我需要使用 DIY 消费者/生产者设置而不是高级设置来完成此操作流 API?
您可以使用
ByteArraySerde
作为无操作 Serde 值。