从 Spring Kafka 2.8.x 升级到 3.0.9 后自定义 recordInterceptor 不工作的问题

问题描述 投票:0回答:1

我最近从 Spring Kafka 2.8.x 升级到 3.0.9,运行测试后,我注意到我的自定义

recordInterceptor
不再按预期运行。我们的
recordInterceptor
旨在跳过具有我们团队未使用的某些
typeId
值的消息。

经过调查,我发现

earlyRecordInterceptor
中设置
KafkaMessageListenerContainer
的条件与之前的版本相比发生了变化。

我在 3.0.9 版本的发行说明中找不到有关此更改的任何信息。

有人遇到过类似的问题或者知道Spring Kafka 3.0.9中与此相关的变化吗?
2.8.x早期记录拦截器

private final RecordInterceptor<K, V> earlyRecordInterceptor =
        isInterceptBeforeTx() || this.transactionManager == null
                ? getRecordInterceptor()
                : null;

3.0.9

private final RecordInterceptor<K, V> earlyRecordInterceptor =
        isInterceptBeforeTx() && this.transactionManager != null
                ? getRecordInterceptor()
                : null;

下面是我的拦截器代码:


class NotSupportedMessageFilter<K, V> : RecordInterceptor<K, V> {

    private val consumerTokens = setOf<String>("sample")
 
    override fun intercept(record: ConsumerRecord<K, V>, consumer: Consumer<K, V>): ConsumerRecord<K, V>? {
        if (isNotSupportedMessage(record)) {
            consumer.commitAsync()
            return null
        }
        return record
    }

    private fun isNotSupportedMessage(record: ConsumerRecord<K, V>): Boolean {
        val headerIterator: Iterator<Header> = record.headers()
            .headers(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).iterator()
        if (!headerIterator.hasNext()) {
            return false
        }
        val token = String(headerIterator.next().value(), StandardCharsets.UTF_8)
        return !consumerTokens.contains(token)
    }
}

https://github.com/spring-projects/spring-kafka/pull/2727 看来所做的更改是为了确保调用 errorHandler。

这是否意味着以前由我的自定义拦截器处理的功能现在应该在 errorHandler 中实现?

spring spring-kafka
1个回答
0
投票

您需要将

ErrorHandlingDeserializer
setFailedDeserializationFunction()
一起使用。如果未提供该函数,则
ErrorHandlingDeserializer
返回
null

还有

ConsumerProperties.checkDeserExWhenValueNull
房产供您考虑。但这仍然会在拦截器之前抛出异常。

© www.soinside.com 2019 - 2024. All rights reserved.