在类级别使用@KafkaListener 时出错

问题描述 投票:0回答:1

我正在尝试在课堂上使用

@KafkaListener
,并在args中使用
@KafkaHandler
@Payload
来验证消息,我需要这个,因为主题有不止一种类型的
Json
。但我有一些错误。

首先我得到:

Caused by: java.lang.IllegalArgumentException: The class 'br.com.producer.chave.model.Chave' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

但是,

br.com.producer.chave.model.Chave
Producer
中的模型,我在消费者中,是另一个项目,无论如何,我在
KafkaConfig

解决了这个问题
configProps.put(JsonDeserializer.TRUSTED_PACKAGES,"*");

但是

Deserializer
试图在我的消费者中找到这个类,我得到了另一个错误:

引起:java.lang.ClassNotFoundException:br.com.producer.chave.model.Chave

我知道当 Json 在生产者中序列化数据时,它会将类型添加到标头中,我可以禁用它。

所以,我正在阅读文档并说要在生产者和消费者中添加

JsonDeserializer.TYPE_MAPPINGS
,但我得到了同样的错误。这是我的课程:

配置生产者: obs:我有 5 种不同的类型,但在 TYPE_MAPPINGS 中我只放了两种用于测试


@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.producer.security.protocol}")
    private String sslSecurityProtocol;

    @Value("${kafka.producer.properties.sasl.mechanism}")
    private String saslMechanism;

    @Value("${kafka.producer.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Value("${kafka.value-serializer}")
    private String valueSerializer;

    @Value("${kafka.key-serializer}")
    private String keySerializer;

    @Value("${kafka.client-id}")
    private String clientId;
    @Bean
    public ProducerFactory<String,Object> producerFactory() {
        JsonSerializer serializer = new JsonSerializer();
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(JsonSerializer.TYPE_MAPPINGS,
                        "chave:br.com.producer.chave.model.Chave," +
                        "cpfCnpj:br.com.producer.cpfcnpj.model.CpfCnpj");
        serializer.configure(configProps, false);
        return new DefaultKafkaProducerFactory<>(configProperties(), new StringSerializer(), serializer);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<String, Object>(producerFactory());
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(configProperties());
    }

    private Map<String,Object> configProperties() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, sslSecurityProtocol);
        configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
        configProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        return configProps;
    }
}


配置消费者:


@Configuration
@Slf4j
public class KafkaConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.consumer.group-id}")
    private String groupId;

    @Value("${kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${kafka.consumer.security.protocol}")
    private String sslSecurityProtocol;

    @Value("${kafka.consumer.properties.sasl.mechanism}")
    private String saslMechanism;

    @Value("${kafka.consumer.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Value("kafka.consumer.enable-auto-commit")
    private String autoCommit;

    @Value("${kafka.consumer.ssl.protocol}")
    private String sslProtocol;

    @Value("${kafka.consumer.ssl.trust-store-location}")
    private String sslTruststoreLocation;

    @Value("${kafka.consumer.ssl.trust-store-password}")
    private String sslTruststorePassword;

    @Value("${kafka.consumer.ssl.trust-store-type}")
    private String sslTruststoreType;

    @Value("${kafka.consumer.client-id}")
    private String clientId;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>

    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setSyncCommits(Boolean.TRUE);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer deserializer = new JsonDeserializer();
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(JsonDeserializer.TYPE_MAPPINGS,
                "chave:br.com.consumer.psib.domain.model.Chave," +
                        "cpfCnpj:br.com.consumer.psib.domain.model.CpfCnpj");
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        deserializer.configure(configProps, false);
        return new DefaultKafkaConsumerFactory<>(configProperties(), new StringDeserializer(), deserializer,true);
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        return new KafkaAdmin(configProperties());
    }


    private Map<String, Object> configProperties() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(autoCommit));

        return configProps;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(configProperties());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

这是我的听众:

@Slf4j
@Component
@KafkaListener(
        topics = "${kafka.consumer.topics.chave-pix-negativa.topic}",
        groupId = "${kafka.consumer.group-id}"
)
public class listener {

    @Autowired
    private IListaNegativaInsertUseCase<Chave> chaveInsertUseCase;
    @Autowired
    private IListaNegativaDeleteUseCase<Chave> chaveDeleteUseCase;

    @Autowired
    private IListaNegativaInsertUseCase<CpfCnpj> cpfCnpjInsertUseCase;

    @Autowired
    private IListaNegativaDeleteUseCase<CpfCnpj> cpfCnpjDeleteUseCase;


    @KafkaHandler
    public void listenerCpfCnpj(@Payload CpfCnpj message, Acknowledgment acknowledgment) {
       //otherthing
    }


    @KafkaHandler
    public void listenerLista(@Payload Chave message, Acknowledgment acknowledgment) {
     //something

    }

    @KafkaHandler(isDefault = true)
    public void defaulthander (@Payload String m){
        log.error("NAO FOI POSSIVEL {}", m);
    }
}

有了这个配置,TYPE_MAPPINGS 不起作用,我不知道是我错过了什么还是我用错了

提前致谢!

我希望 @KafkaHandler 验证了 paylod 并根据消息的类型转到正确的处理程序

java spring apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

如果您依赖于目标方法的类型推断(这确实是

@KafkaHandler
的目的),那么您不能使用
JsonDeserializer
并依赖能够将类型反序列化为的
JsonMessageConverter
从它将要用于的方法。在这种情况下,您必须将反序列化器保留为普通的
ByteArrayDeserializer
.

在文档中查看更多信息:https://docs.spring.io/spring-kafka/reference/html/#messaging-message-conversion

那么是的:您可以在生产者方面禁用类型填充。

© www.soinside.com 2019 - 2024. All rights reserved.