我正在尝试在课堂上使用
@KafkaListener
,并在args中使用@KafkaHandler
和@Payload
来验证消息,我需要这个,因为主题有不止一种类型的Json
。但我有一些错误。首先我得到:
Caused by: java.lang.IllegalArgumentException: The class 'br.com.producer.chave.model.Chave' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
但是,
br.com.producer.chave.model.Chave
是Producer
中的模型,我在消费者中,是另一个项目,无论如何,我在KafkaConfig
解决了这个问题
configProps.put(JsonDeserializer.TRUSTED_PACKAGES,"*");
但是
Deserializer
试图在我的消费者中找到这个类,我得到了另一个错误:
引起:java.lang.ClassNotFoundException:br.com.producer.chave.model.Chave
我知道当 Json 在生产者中序列化数据时,它会将类型添加到标头中,我可以禁用它。
所以,我正在阅读文档并说要在生产者和消费者中添加
JsonDeserializer.TYPE_MAPPINGS
,但我得到了同样的错误。这是我的课程:
配置生产者: obs:我有 5 种不同的类型,但在 TYPE_MAPPINGS 中我只放了两种用于测试
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.security.protocol}")
private String sslSecurityProtocol;
@Value("${kafka.producer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${kafka.producer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${kafka.value-serializer}")
private String valueSerializer;
@Value("${kafka.key-serializer}")
private String keySerializer;
@Value("${kafka.client-id}")
private String clientId;
@Bean
public ProducerFactory<String,Object> producerFactory() {
JsonSerializer serializer = new JsonSerializer();
Map<String, Object> configProps = new HashMap<>();
configProps.put(JsonSerializer.TYPE_MAPPINGS,
"chave:br.com.producer.chave.model.Chave," +
"cpfCnpj:br.com.producer.cpfcnpj.model.CpfCnpj");
serializer.configure(configProps, false);
return new DefaultKafkaProducerFactory<>(configProperties(), new StringSerializer(), serializer);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<String, Object>(producerFactory());
}
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(configProperties());
}
private Map<String,Object> configProperties() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, sslSecurityProtocol);
configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
return configProps;
}
}
配置消费者:
@Configuration
@Slf4j
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String groupId;
@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.security.protocol}")
private String sslSecurityProtocol;
@Value("${kafka.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${kafka.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("kafka.consumer.enable-auto-commit")
private String autoCommit;
@Value("${kafka.consumer.ssl.protocol}")
private String sslProtocol;
@Value("${kafka.consumer.ssl.trust-store-location}")
private String sslTruststoreLocation;
@Value("${kafka.consumer.ssl.trust-store-password}")
private String sslTruststorePassword;
@Value("${kafka.consumer.ssl.trust-store-type}")
private String sslTruststoreType;
@Value("${kafka.consumer.client-id}")
private String clientId;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setSyncCommits(Boolean.TRUE);
return factory;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
JsonDeserializer deserializer = new JsonDeserializer();
Map<String, Object> configProps = new HashMap<>();
configProps.put(JsonDeserializer.TYPE_MAPPINGS,
"chave:br.com.consumer.psib.domain.model.Chave," +
"cpfCnpj:br.com.consumer.psib.domain.model.CpfCnpj");
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
deserializer.configure(configProps, false);
return new DefaultKafkaConsumerFactory<>(configProperties(), new StringDeserializer(), deserializer,true);
}
@Bean
public KafkaAdmin kafkaAdmin() {
return new KafkaAdmin(configProperties());
}
private Map<String, Object> configProperties() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(autoCommit));
return configProps;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(configProperties());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这是我的听众:
@Slf4j
@Component
@KafkaListener(
topics = "${kafka.consumer.topics.chave-pix-negativa.topic}",
groupId = "${kafka.consumer.group-id}"
)
public class listener {
@Autowired
private IListaNegativaInsertUseCase<Chave> chaveInsertUseCase;
@Autowired
private IListaNegativaDeleteUseCase<Chave> chaveDeleteUseCase;
@Autowired
private IListaNegativaInsertUseCase<CpfCnpj> cpfCnpjInsertUseCase;
@Autowired
private IListaNegativaDeleteUseCase<CpfCnpj> cpfCnpjDeleteUseCase;
@KafkaHandler
public void listenerCpfCnpj(@Payload CpfCnpj message, Acknowledgment acknowledgment) {
//otherthing
}
@KafkaHandler
public void listenerLista(@Payload Chave message, Acknowledgment acknowledgment) {
//something
}
@KafkaHandler(isDefault = true)
public void defaulthander (@Payload String m){
log.error("NAO FOI POSSIVEL {}", m);
}
}
有了这个配置,TYPE_MAPPINGS 不起作用,我不知道是我错过了什么还是我用错了
提前致谢!
我希望 @KafkaHandler 验证了 paylod 并根据消息的类型转到正确的处理程序
如果您依赖于目标方法的类型推断(这确实是
@KafkaHandler
的目的),那么您不能使用 JsonDeserializer
并依赖能够将类型反序列化为的 JsonMessageConverter
从它将要用于的方法。在这种情况下,您必须将反序列化器保留为普通的ByteArrayDeserializer
.
在文档中查看更多信息:https://docs.spring.io/spring-kafka/reference/html/#messaging-message-conversion
那么是的:您可以在生产者方面禁用类型填充。