spring-kafka多个@KafkaListener用不同的Json对象不能工作。

问题描述 投票:0回答:1

我的四个不同的@KafkaListers有一个共同的工厂,每个监听器都应该从其各自的主题中消耗其各自的JSON对象。我不能让它工作,因为我得到的异常是:他的类'com.abc.MyObject'不在受信任包中。[java.util, java.lang]。如果你认为这个类可以安全地反序列化,请提供它的名字。如果序列化只是由一个受信任的源完成,你也可以启用信任所有(*)。即使我添加了下面的内容,我仍然得到了同样的异常:config.put(JsonDeserializer.TRUSTED_PACKAGES,"");如果我不使用通用工厂,而是将其用于一个特定的对象,那么该对象可以正常工作,但这样一来,我将不得不为四个不同的Kafkalisteners创建四个工厂。

我的Kafka列表器。

@KafkaListener( topics="number_1_event", groupId="abc-group", containerFactory="kafkaABCListenerContainerFactory")
public void consumeMyMessage(MyTopic1Class data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
            @Header(KafkaHeaders.OFFSET) int offsetId) {
                // code here
            }



@KafkaListener( topics="number_2_event", groupId="abc2-group", containerFactory="kafkaABCListenerContainerFactory")
/* Similar method signature here*/

@KafkaListener( topics="number_3_event", groupId="abc3-group", containerFactory="kafkaABCListenerContainerFactory")

@KafkaListener( topics="number_4_event", groupId="abc4-group", containerFactory="kafkaABCListenerContainerFactory")

配置:

@Bean
public ConsumerFactory<String, Object> abcdConsumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");


    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(Object.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaABCListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();      
    factory.setConsumerFactory(abcdConsumerFactory());

    return factory;
    }

我目前正在使用上述配置,我得到了上面提到的信任包错误。如果我将上述配置中的Object替换为MyTopic1Class,或者MyTopic2Class。 然后运行它,它将为该特定对象工作就好了。 请帮助我!

java spring-boot apache-kafka spring-kafka
1个回答
0
投票

你的意思是我可以只使用 ConcurrentKafkaListenerContainerFactory<String, MyTopic1Class>ConsumerFactory<String, MyTopic1Class> abcdConsumerFactory()......当我的number_1_event主题接收到MyTopic1Class时,这个工厂签名可以工作。当话题 number_2_event 收到 MyTopic2Class 并使用相同的工厂签名时,它是否可以工作?

是的,它可以用,但用起来更干净。

ConcurrentKafkaListenerContainerFactory<String, Object> abcdContainerFactory()
ConsumerFactory<String, Object> abcdConsumerFactory()

你的听众可以消费较窄的类型 MyTopic1Class, MyTopic2Class 等。

工厂上的通用类型只在编译时使用,它们在运行时被清除。

© www.soinside.com 2019 - 2024. All rights reserved.