Kafka 反序列化错误并记录分区、主题和偏移量

问题描述 投票:0回答:1

我正在使用我的

ErrorHandlingDeserializer
上发送的
DefaultKafkaConsumerFactory
处理反序列化错误。

try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
    errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
    return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), consumerKeyDeserializer, errorHandlingDeserializer);
}

我的自定义函数进行一些处理并发布到毒丸主题并返回

null

当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止在函数中返回 null 并返回

MyEvent
的新子类型。然后我的
KafkaListener
可以询问新的子类型。

我有一个

@KafkaListener
组件,它监听
ConsumerRecord
,如下所示:

@KafkaListner(....)
public void onMessage(ConsumerRecord<String, MyEvent> record) {
  ...
  ...
  
// if record.value instance of MyNewSubType
//   I have access to the topic, partition and offset here, so I could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a failed record.

}

这是这样做的方法吗?我知道

null
有特殊含义卡夫卡。

这种子类型方法的缺点是,我必须使用

ErrorHandlingDeserializer
为每种类型创建一个子类型。

java spring spring-kafka
1个回答
0
投票

不要使用函数;相反,抛出的

DeserializationException
直接传递到容器的
ErrorHandler

SeekToCurrentErrorHandler
认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。

提供了发送记录的

DeadLetterPublishingRecoverer

参见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

© www.soinside.com 2019 - 2024. All rights reserved.