Kafka - 反序列化 Consumer 中的对象

问题描述 投票:0回答:2

我们正在考虑在我们的消息传递中使用 Kafka,我们的应用程序是使用 Spring 开发的。所以,我们计划使用spring-kafka。

生产者将消息作为HashMap对象放入队列中。我们有 JSON 序列化器,并且假设映射将被序列化并放入队列中。这是生产者配置。

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
        key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

另一方面,我们有一个监听器,它监听生产者发布消息的同一主题。这是消费者配置:

spring:
   kafka:
       consumer:
            group-id: xyz
            key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
            value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

我们的监听器方法:

  public void listener(SomeClass abx)

我们期望 json 将被反序列化并生成“SomeClass”类型的对象。但显然,它会抛出反序列化异常。

我们看到了几篇文章,建议是做类似的事情:

 @Bean
  public ConsumerFactory<String, Car> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new JsonDeserializer<>(Car.class));
  }

我们不想编写一些代码来创建解串器。我们是否缺少任何样板文件? 任何帮助将不胜感激!!

java apache-kafka spring-kafka
2个回答
22
投票

请参阅启动文档。特别是:

您还可以按如下方式配置 Spring Kafka JsonDeserializer:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice

spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme


0
投票
        I was consuming remote Kafka producer event and facing Class not found exception.
        
        so finally I removed configuration form .properties file added below config class in consumer.
  
Here is my application.properties.
spring.application.name=payment-service
server.port=8082
spring.kafka.payment.bootstrap-servers= localhost:9092
spring.kafka.order.consumer.group-id.notification= group-id
spring.kafka.consumer.auto-offset-reset= latest
spring.kafka.order.topic.create-order=new_order1    


  
        @EnableKafka
        @Configuration("NotificationConfiguration")
        public class CreateOrderConsumerConfig {
            @Value("${spring.kafka.payment.bootstrap-servers}")
            private String bootstrapServers;
            @Value("${spring.kafka.order.consumer.group-id.notification}")
            private String groupId;
            @Bean("NotificationConsumerFactory")
            public ConsumerFactory<String, OrderEvent> createOrderConsumerFactory() {
                Map<String, Object> props = new HashMap<>();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
                props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
                props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.swiggy.payment.event.OrderEvent");// this my consumer event class
                props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
                props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        
                return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                        new JsonDeserializer<>(OrderEvent.class));
            }
            @Bean("NotificationContainerFactory")
            public ConcurrentKafkaListenerContainerFactory<String, OrderEvent> createOrderKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
                factory.setConsumerFactory(createOrderConsumerFactory());
                
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
                return factory;
            }
        }
© www.soinside.com 2019 - 2024. All rights reserved.