KafkaException:类不是 org.apache.kafka.common.serialization.Deserializer 的实例

问题描述 投票:0回答:3

我想实现发送和接收Java序列化对象的Kafka生产者。我试过这个:

制作人:

@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactory.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    /*
        Serialization configuration
     */
    return new DefaultKafkaProducerFactory<>(configProps);
}


@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
    return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

}

发送对象:

@Autowired
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;

private static String topic = "tp-sale";

private void perform(){

    SaleRequestFactory obj = new SaleRequestFactory();
    obj.setId(100);

    ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);
}

消费者:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    private String groupId = "test";

    @Bean
    public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactory.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

//接收对象

    @KafkaListener(topics = "tp-sale")
public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

    System.out.println(tf.getId());

    SaleResponseFactory resObj = new SaleResponseFactory();
    resObj.setUnique_id("123123");

    return resObj;
}

当我部署生产者时,我在部署过程中遇到错误:

Caused by: org.apache.kafka.common.KafkaException: class org.engine.plugin.transactions.factory.SaleResponseFactory is not an instance of org.apache.kafka.common.serialization.Deserializer

自定义对象

import org.apache.kafka.common.serialization.Serializer;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable, Serializer {

    private static final long serialVersionUID = 1744050117179344127L;
    
    private int id;

    @Override
    public byte[] serialize(String s, Object o) {
        return new byte[0];
    }
}

```` 导入 org.apache.kafka.common.serialization.Deserializer; @盖特 @塞特 @NoArgs构造函数 @AllArgsConstructor @Builder(toBuilder = true) 公共类 SaleResponseFactory 实现可序列化、反序列化器 {

private static final long serialVersionUID = 1744050117179344127L;

private String unique_id;

@Override
public Object deserialize(String s, byte[] bytes) {
    return null;
}

}


Do you know how I can fix this issue?

***EDIT:***  I tried this:

***Producer:***

    @Configuration
    public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new DefaultKafkaProducerFactory<>(configProps);
    }


    @Bean
    public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 }


***Send object:***
  
    @Autowired
    private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;

    private static String topic = "tp-sale";

    private void perform(){

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);
    }

***Consumer:***

@EnableKafka @配置 公共类 KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

private String groupId = "test";

@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}


// Receive Object

        @KafkaListener(topics = "tp-sale")
    public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

        System.out.println(tf.getId());

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }


***Custom objects*** 
    
        @Getter
        @Setter
        @NoArgsConstructor
        @AllArgsConstructor
        @Builder(toBuilder = true)
        public class SaleRequestFactory implements Serializable{
        
            private static final long serialVersionUID = 1744050117179344127L;
            
            private int id;
        }
    
    public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
    
        @Override
        public byte[] serialize(String topic, SaleRequestFactory data) {
            // convert data to byte[]
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            try
            {
                ObjectOutputStream outputStream = new ObjectOutputStream(out);
                outputStream.writeObject(data);
                out.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
    
            return out.toByteArray();
        }
    }
    
    
        @Getter
        @Setter
        @NoArgsConstructor
        @AllArgsConstructor
        @Builder(toBuilder = true)
        public class SaleResponseFactory implements Serializable{
        
            private static final long serialVersionUID = 1744050117179344127L;
        
            private String unique_id;
        }
    
    public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleResponseFactory> {
    
        @Override
        public SaleResponseFactory deserialize(String topic, byte[] data) {
            // convert data to SaleResponseFactory
            SaleResponseFactory saleResponseFactory = null;
            try
            {
                ByteArrayInputStream bis = new ByteArrayInputStream(data);
                ObjectInputStream in = new ObjectInputStream(bis);
                saleResponseFactory = (SaleResponseFactory) in.readObject();
                in.close();
            }
            catch (IOException | ClassNotFoundException e)
            {
                e.printStackTrace();
            }
            return saleResponseFactory;
        }
    }


When I try to send message I get error:

    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale-0 at offset 0. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.ClassCastException: null
    21:27:51.152 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:296] - Commit list: {}
    21:27:51.153 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
            at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
            at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265)
            at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at java.base/java.lang.Thread.run(Thread.java:835)
    Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale-0 at offset 0. If needed, please seek past the record to continue consumption.
    Caused by: java.lang.ClassCastException: null


Do you know how I can fix this issue?

***EDIT:***
I managed to implement these improvements:

***Producer:***

    @Configuration
    public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new DefaultKafkaProducerFactory<>(configProps);
    }


    @Bean
    public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

***Send object:***
  
    @Autowired
    private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;

    private static String topic = "tp-sale";

    private void perform(){

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ListenableFuture<SendResult<String, SaleRequestFactory>> send = saleRequestFactoryKafkaTemplate.send(topic, obj);
    }

***Consumer:***

@EnableKafka @配置 公共类 KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

private String groupId = "test";

@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

}


***Receive Object***

        @KafkaListener(topics = "tp-sale")
    public SaleResponseFactory transactionElavonAuthorizeProcess(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

        System.out.println(tf.getId());

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }


***Custom objects*** 
    
        @Getter
        @Setter
        @NoArgsConstructor
        @AllArgsConstructor
        @Builder(toBuilder = true)
        public class SaleRequestFactory implements Serializable{
        
            private static final long serialVersionUID = 1744050117179344127L;
            
            private int id;
        }
    
    public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
    
        @Override
        public byte[] serialize(String topic, SaleRequestFactory data) {
            // convert data to byte[]
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            try
            {
                ObjectOutputStream outputStream = new ObjectOutputStream(out);
                outputStream.writeObject(data);
                out.close();
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
    
            return out.toByteArray();
        }
    }
    
    
        @Getter
        @Setter
        @NoArgsConstructor
        @AllArgsConstructor
        @Builder(toBuilder = true)
        public class SaleResponseFactory implements Serializable{
        
            private static final long serialVersionUID = 1744050117179344127L;
        
            private String unique_id;
        }
    
    public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleResponseFactory> {
    
        @Override
        public SaleResponseFactory deserialize(String topic, byte[] data) {
            // convert data to SaleResponseFactory
            SaleResponseFactory saleResponseFactory = null;
            try
            {
                ByteArrayInputStream bis = new ByteArrayInputStream(data);
                ObjectInputStream in = new ObjectInputStream(bis);
                saleResponseFactory = (SaleResponseFactory) in.readObject();
                in.close();
            }
            catch (IOException | ClassNotFoundException e)
            {
                e.printStackTrace();
            }
            return saleResponseFactory;
        }
    }

When I send some message I get error: 

    13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - Listener method returned result [org.factory.SaleResponseFactory@69c400ab] - generating response message for it
    13:03:53.675 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG RecordMessagingMessageListenerAdapter[debug:296] - No replyTopic to handle the reply: org.factory.SaleResponseFactory@69c400ab

Do you know how I can solve this issue?
java spring spring-boot apache-kafka spring-kafka
3个回答
2
投票

您使用与序列化对象不同的类型来转换对象。不知道为什么你需要这样做。您可以将反序列化更新为如下所示。

public class SaleRequestFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {

     @Override
     public SaleRequestFactory deserialize(String topic, byte[] data) {
      ...
        saleRequestFactory = (SaleRequestFactory) in.readObject();

    }
}
java.lang.ClassCastException: null

这也意味着您的序列化未按预期工作。在尝试投射之前,请确保您具有有效的有效负载。


1
投票

KEY_DESERIALIZER_CLASS_CONFIG 是将网络值反序列化为适当的 Java 类的类。您提供的课程不会这样做。

大多数情况下,使用StringDeserializer。为工厂属性指定适当的反序列化器。


1
投票

对于那些使用 application.properties 作为 kafka 配置参数的人,请确保它看起来像这样

spring.kafka.consumer.bootstrap-servers=localhost:8888
spring.kafka.consumer.group-id=message
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers=localhost:8888
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
© www.soinside.com 2019 - 2024. All rights reserved.