我的四个不同的@KafkaListers有一个共同的工厂,每个监听器都应该从其各自的主题中消耗其各自的JSON对象。我不能让它工作,因为我得到的异常是:他的类'com.abc.MyObject'不在受信任包中。[java.util, java.lang]。如果你认为这个类可以安全地反序列化,请提供它的名字。如果序列化只是由一个受信任的源完成,你也可以启用信任所有(*)。即使我添加了下面的内容,我仍然得到了同样的异常:config.put(JsonDeserializer.TRUSTED_PACKAGES,"");如果我不使用通用工厂,而是将其用于一个特定的对象,那么该对象可以正常工作,但这样一来,我将不得不为四个不同的Kafkalisteners创建四个工厂。
我的Kafka列表器。
@KafkaListener( topics="number_1_event", groupId="abc-group", containerFactory="kafkaABCListenerContainerFactory")
public void consumeMyMessage(MyTopic1Class data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) int offsetId) {
// code here
}
@KafkaListener( topics="number_2_event", groupId="abc2-group", containerFactory="kafkaABCListenerContainerFactory")
/* Similar method signature here*/
@KafkaListener( topics="number_3_event", groupId="abc3-group", containerFactory="kafkaABCListenerContainerFactory")
@KafkaListener( topics="number_4_event", groupId="abc4-group", containerFactory="kafkaABCListenerContainerFactory")
配置:
@Bean
public ConsumerFactory<String, Object> abcdConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Object.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaABCListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcdConsumerFactory());
return factory;
}
我目前正在使用上述配置,我得到了上面提到的信任包错误。如果我将上述配置中的Object替换为MyTopic1Class,或者MyTopic2Class。 然后运行它,它将为该特定对象工作就好了。 请帮助我!
你的意思是我可以只使用
ConcurrentKafkaListenerContainerFactory<String, MyTopic1Class>
和ConsumerFactory<String, MyTopic1Class> abcdConsumerFactory()
......当我的number_1_event主题接收到MyTopic1Class时,这个工厂签名可以工作。当话题 number_2_event 收到 MyTopic2Class 并使用相同的工厂签名时,它是否可以工作?
是的,它可以用,但用起来更干净。
ConcurrentKafkaListenerContainerFactory<String, Object> abcdContainerFactory()
ConsumerFactory<String, Object> abcdConsumerFactory()
你的听众可以消费较窄的类型 MyTopic1Class
, MyTopic2Class
等。
工厂上的通用类型只在编译时使用,它们在运行时被清除。