在 KafkaListener 上定义自定义值反序列化器

问题描述 投票:0回答:1

在我的主要

application.properties
中,我有:

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

一切正常,并且值按预期反序列化为

String

现在,在我的集成测试中,我想引入另一个

KafkaListener
(即第二个侦听器,我不想覆盖主应用程序中侦听器的行为!),但这次使用另一个值反序列化器(到字节数组)。这是否可能,而无需为此侦听器引入自定义
ListenerContainerFactory

我尝试了以下方法,但没有成功:

@KafkaListener(topics = ..., properties = "value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
    ...
}

我得到:

java.lang.ClassCastException: java.lang.String cannot be cast to [B

这意味着未使用集成测试中

KafkaListener
上定义的临时值解串器。

我目前使用的是 spring-kafka 2.5.8。

apache-kafka spring-kafka
1个回答
3
投票

properties
需要是字符串列表,并且可以引用常量

@KafkaListener(topics = ..., properties = {
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + "=org.apache.kafka.common.serialization.ByteArrayDeserializer"
})
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
    ...
}

https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties

这将正确使用

value.deserializer
作为键(点,而不是连字符)

或者在集成测试中,您可以加载不同的属性文件。

© www.soinside.com 2019 - 2024. All rights reserved.