Kafka-Streams 消费者的记录拦截器

问题描述 投票:0回答:2

我正在寻找 Kafka-streams 来进行事件处理。我尝试为 Kakfa-Streams 添加一个拦截器(针对消费者)。

我添加了一个 RecordInterceptor,如下所示:

  configMap.put(consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.package.to.interceptor.MyCustomRecordInterceptor");

但是我在启动过程中遇到错误:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: java.lang.ClassCastException: class com.package.to.interceptor.MyCustomRecordInterceptor

如果我添加一个实现的拦截器,它就可以正常工作

org.apache.kafka.clients.consumer.ConsumerInterceptor
.

但是我需要一个 RecordInterceptor。

我的问题是,有没有办法将 RecordInterceptor 实现作为 Consumer 拦截器添加到 Kafka-streams 中?非常感谢任何帮助。

apache-kafka apache-kafka-streams
2个回答
-1
投票

Kafa Stream 在内部使用生产者和消费者,因此您必须为属性添加前缀。

对于消费者

configMap.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),"com.package.to.interceptor.MyCustomRecordInterceptor")

对于制作人

configMap.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),"com.package.to.interceptor.MyCustomRecordProducerInterceptor")

-1
投票

我需要一个记录拦截器

RecordInterceptor 是一个 spring-kafka 接口,而不是普通的 Kafka API

消费者只会接受 ConsumerInterceptor 的实现。你的代码是正确的,否则。

生产者仅接受 ProducerInterceptor,并且您可以在 Streams 配置/映射中使用 ProducerPrefix

© www.soinside.com 2019 - 2024. All rights reserved.