Kafka消费者ClassNotFoundException

问题描述 投票:0回答:1

(在开始提问之前,我的英语可能不足以清楚地描述所有内容。如果您不明白,请告诉我。)

我正在尝试通过 Kafka 将数据对象从 A spring 项目(生产者)发送到 B spring 项目(消费者)。

问题是 A 和 B 中的数据对象具有不同的类路径。所以B项目数据类无法映射A项目的字段。

但是两个对象具有相同的字段。所以我想从 A 项目获取对象作为 B 项目的参数。

错误信息

Listener failed; nested exception is 

org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is 

org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.springboot.DTO.kafka.PostViewCountDTO]; nested exception is 

java.lang.ClassNotFoundException: com.example.springboot.DTO.kafka.PostViewCountDTO

build.gradle

    implementation 'org.apache.kafka:kafka-clients:2.8.0'
    implementation 'org.apache.kafka:kafka_2.13:2.8.0'
    implementation 'org.springframework.boot:spring-boot-starter-web:2.5.3'

数据类(使用A和B项目)

public class PostViewCountDTO implements Serializable {
    private static final long serialVersionUID = 1L;

    @NotNull
    private long postNo;
}

生产者配置

@Configuration
public class PostViewProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServer;

    @Bean
    public Map<String,Object> postViewProducerConfigs() {
        return JsonSerializer.getStringObjectMap(bootstrapServer);
    }

    @Bean
    public ProducerFactory<String, PostViewCountDTO> postViewCountDTOProducerFactory() {
        return new DefaultKafkaProducerFactory<>(postViewProducerConfigs());
    }

    @Bean
    public KafkaTemplate<String, PostViewCountDTO> postViewDTOKafkaTemplate() {
        return new KafkaTemplate<>(postViewCountDTOProducerFactory());
    }
}

常用JsonSerializer类

public class JsonSerializer {

    static Map<String, Object> getStringObjectMap(String bootstrapServer) {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);

        return props;
    }
}

消费者配置

@Configuration
@RequiredArgsConstructor
public class PostViewConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServer;

    @Bean
    public Map<String,Object> postViewConsumerConfigs() {
        return JsonDeserializer.getStringObjectMap(bootstrapServer);
    }

    @Bean
    public ConsumerFactory<String, PostViewCountDTO> postViewCountDTO_ConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(postViewConsumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PostViewCountDTO> postViewCountListener() {
        ConcurrentKafkaListenerContainerFactory<String, PostViewCountDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postViewCountDTO_ConsumerFactory());
        return factory;
    }

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }
}

生产

    @Async
    public void sendPostNo(PostViewCountDTO postViewCountDTO) {
        postViewKafkaTemplate.send(topic_viewCount, null, postViewCountDTO);
    }

消费

    @KafkaListener(topics = topic_viewCount, groupId = groupId, containerFactory = "postViewCountListener")
    public void consume(@Payload PostViewCountDTO postViewCountDTO) {
        ...
    }
java apache-kafka spring-kafka
1个回答
7
投票

您需要向序列化器和反序列化器添加类型映射

https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#serdes-mapping-types

在生产者方面,将

com.a.PostViewCountDTO
映射到
PostViewCountDTO
。 在消费者方面,将
com.b.PostViewCountDTO
映射到
PostViewCountDTO

© www.soinside.com 2019 - 2024. All rights reserved.