我正在使用我的
ErrorHandlingDeserializer
上发送的 DefaultKafkaConsumerFactory
处理反序列化错误。
try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
}
我的自定义函数进行一些处理并发布到毒丸主题并返回
null
。
当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止在函数中返回 null 并返回
MyEvent
的新子类型。然后我的 KafkaListener
可以询问新的子类型。
我有一个
@KafkaListener
组件,它监听 ConsumerRecord
,如下所示:
@KafkaListner(....)
public void onMessage(ConsumerRecord<String, MyEvent> record) {
...
...
// if record.value instance of MyNewSubType
// I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.
}
这是这样做的方法吗?我知道
null
有特殊含义卡夫卡。
这种子类型方法的缺点是,我必须使用
ErrorHandlingDeserializer
为每种类型创建一个子类型。
不要使用函数;相反,抛出的
DeserializationException
直接传递到容器的 ErrorHandler
。
SeekToCurrentErrorHandler
认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。
提供了发送记录的
DeadLetterPublishingRecoverer
。
参见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
和
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters