在我的主要
application.properties
中,我有:
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
一切正常,并且值按预期反序列化为
String
。
现在,在我的集成测试中,我想引入另一个
KafkaListener
(即第二个侦听器,我不想覆盖主应用程序中侦听器的行为!),但这次使用另一个值反序列化器(到字节数组)。这是否可能,而无需为此侦听器引入自定义 ListenerContainerFactory
?
我尝试了以下方法,但没有成功:
@KafkaListener(topics = ..., properties = "value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
...
}
我得到:
java.lang.ClassCastException: java.lang.String cannot be cast to [B
这意味着未使用集成测试中
KafkaListener
上定义的临时值解串器。
我目前使用的是 spring-kafka 2.5.8。
properties
需要是字符串列表,并且可以引用常量
@KafkaListener(topics = ..., properties = {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + "=org.apache.kafka.common.serialization.ByteArrayDeserializer"
})
public void receiveKafkaRecords(final ConsumerRecord<String, byte[]> record) {
...
}
https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties
这将正确使用
value.deserializer
作为键(点,而不是连字符)
或者在集成测试中,您可以加载不同的属性文件。