我正在寻找 Kafka-streams 来进行事件处理。我尝试为 Kakfa-Streams 添加一个拦截器(针对消费者)。
我添加了一个 RecordInterceptor,如下所示:
configMap.put(consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.package.to.interceptor.MyCustomRecordInterceptor");
但是我在启动过程中遇到错误:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: java.lang.ClassCastException: class com.package.to.interceptor.MyCustomRecordInterceptor
如果我添加一个实现的拦截器,它就可以正常工作
org.apache.kafka.clients.consumer.ConsumerInterceptor
.
但是我需要一个 RecordInterceptor。
我的问题是,有没有办法将 RecordInterceptor 实现作为 Consumer 拦截器添加到 Kafka-streams 中?非常感谢任何帮助。
Kafa Stream 在内部使用生产者和消费者,因此您必须为属性添加前缀。
对于消费者
configMap.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),"com.package.to.interceptor.MyCustomRecordInterceptor")
对于制作人
configMap.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),"com.package.to.interceptor.MyCustomRecordProducerInterceptor")
我需要一个记录拦截器
RecordInterceptor 是一个 spring-kafka 接口,而不是普通的 Kafka API
消费者只会接受 ConsumerInterceptor 的实现。你的代码是正确的,否则。
生产者仅接受 ProducerInterceptor,并且您可以在 Streams 配置/映射中使用 ProducerPrefix