我最近从 Spring Kafka 2.8.x 升级到 3.0.9,运行测试后,我注意到我的自定义
recordInterceptor
不再按预期运行。我们的 recordInterceptor
旨在跳过具有我们团队未使用的某些 typeId
值的消息。
经过调查,我发现
earlyRecordInterceptor
中设置KafkaMessageListenerContainer
的条件与之前的版本相比发生了变化。
我在 3.0.9 版本的发行说明中找不到有关此更改的任何信息。
有人遇到过类似的问题或者知道Spring Kafka 3.0.9中与此相关的变化吗?
2.8.x早期记录拦截器
private final RecordInterceptor<K, V> earlyRecordInterceptor =
isInterceptBeforeTx() || this.transactionManager == null
? getRecordInterceptor()
: null;
3.0.9
private final RecordInterceptor<K, V> earlyRecordInterceptor =
isInterceptBeforeTx() && this.transactionManager != null
? getRecordInterceptor()
: null;
下面是我的拦截器代码:
class NotSupportedMessageFilter<K, V> : RecordInterceptor<K, V> {
private val consumerTokens = setOf<String>("sample")
override fun intercept(record: ConsumerRecord<K, V>, consumer: Consumer<K, V>): ConsumerRecord<K, V>? {
if (isNotSupportedMessage(record)) {
consumer.commitAsync()
return null
}
return record
}
private fun isNotSupportedMessage(record: ConsumerRecord<K, V>): Boolean {
val headerIterator: Iterator<Header> = record.headers()
.headers(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).iterator()
if (!headerIterator.hasNext()) {
return false
}
val token = String(headerIterator.next().value(), StandardCharsets.UTF_8)
return !consumerTokens.contains(token)
}
}
https://github.com/spring-projects/spring-kafka/pull/2727 看来所做的更改是为了确保调用 errorHandler。
这是否意味着以前由我的自定义拦截器处理的功能现在应该在 errorHandler 中实现?
您需要将
ErrorHandlingDeserializer
与 setFailedDeserializationFunction()
一起使用。如果未提供该函数,则 ErrorHandlingDeserializer
返回 null
。
还有
ConsumerProperties.checkDeserExWhenValueNull
房产供您考虑。但这仍然会在拦截器之前抛出异常。