我们正在考虑在我们的消息传递中使用 Kafka,我们的应用程序是使用 Spring 开发的。所以,我们计划使用spring-kafka。
生产者将消息作为HashMap对象放入队列中。我们有 JSON 序列化器,并且假设映射将被序列化并放入队列中。这是生产者配置。
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
另一方面,我们有一个监听器,它监听生产者发布消息的同一主题。这是消费者配置:
spring:
kafka:
consumer:
group-id: xyz
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
我们的监听器方法:
public void listener(SomeClass abx)
我们期望 json 将被反序列化并生成“SomeClass”类型的对象。但显然,它会抛出反序列化异常。
我们看到了几篇文章,建议是做类似的事情:
@Bean
public ConsumerFactory<String, Car> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(Car.class));
}
我们不想编写一些代码来创建解串器。我们是否缺少任何样板文件? 任何帮助将不胜感激!!
请参阅启动文档。特别是:
您还可以按如下方式配置 Spring Kafka JsonDeserializer:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
I was consuming remote Kafka producer event and facing Class not found exception.
so finally I removed configuration form .properties file added below config class in consumer.
Here is my application.properties.
spring.application.name=payment-service
server.port=8082
spring.kafka.payment.bootstrap-servers= localhost:9092
spring.kafka.order.consumer.group-id.notification= group-id
spring.kafka.consumer.auto-offset-reset= latest
spring.kafka.order.topic.create-order=new_order1
@EnableKafka
@Configuration("NotificationConfiguration")
public class CreateOrderConsumerConfig {
@Value("${spring.kafka.payment.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.order.consumer.group-id.notification}")
private String groupId;
@Bean("NotificationConsumerFactory")
public ConsumerFactory<String, OrderEvent> createOrderConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.swiggy.payment.event.OrderEvent");// this my consumer event class
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(OrderEvent.class));
}
@Bean("NotificationContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> createOrderKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createOrderConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}