java中kafka主题的对象序列化错误

问题描述 投票:0回答:1

我正在尝试序列化事件对象以生成 kafka 事件。 该项目的所有代码都可以在这里找到:

https://github.com/Gaboxondo/springBootCQRS示例

当我尝试发布事件 AccountOpenedEvent 时,出现以下错误:

ERROR 26516 --- [nio-8090-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.mashosoft.AccountCommand.domain.events.model.AccountOpenedEvent to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer] with root cause

java.lang.ClassCastException: class com.mashosoft.AccountCommand.domain.events.model.AccountOpenedEvent cannot be cast to class java.lang.String (com.mashosoft.AccountCommand.domain.events.model.AccountOpenedEvent is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

我已经阅读了很多有关此错误的内容,但我并不真正理解它。 我也看到了这篇文章,但说实话,我仍然不太明白这个问题 https://www.baeldung.com/java-classcastException

我知道 maily 是一个由于继承而产生的问题,也可能是 lombok,但仍然不知道如何解决它

课程如下:

@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class AccountOpenedEvent extends BaseEvent {
    private String accountHolder;
    private Date creationDate;
    private Double openingBalance;
}

和基本事件抽象类:

@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public abstract class BaseEvent {
    private String id;
    private int version;
}

kafka的配置如下:

    kafka:
      bootstrap-servers: localhost:9092
      producer:
        bootstrap-servers: localhost:9092
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

正如我所说,所有代码都在 github 链接上并且是公开的。您可以运行 docker compose,然后就像运行主类一样简单:

enter image description here

与kafka的连接完美,主题自动创建完成。 enter image description here

您可以在发送创建请求时遇到序列化错误,然后当kafka spring尝试序列化对象时失败 enter image description here

enter image description here

非常感谢您提前提供的所有帮助。


更新:如果我手动进行kafka模板配置,它就可以工作

@Configuration
public class KafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, BaseEvent> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory(), true);
    }

    @Bean
    public ProducerFactory<String, BaseEvent> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // Use JsonSerializer for complex objects

        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

我仍然想知道我是否使用了错误的属性,或者我对属性做了什么错误

java spring-boot serialization apache-kafka
1个回答
0
投票

我查看了您提交到存储库的代码,并注意到明显的问题在于 application.yaml 配置文件中的缩进。您的配置中缺少空间。请仔细看下面的截图。

错了

如果您想知道“如果出现错误,它如何能够连接到服务器?”很简单:由于 Spring Kafka 的默认配置,它使用端口 9092 和本地服务器。

© www.soinside.com 2019 - 2024. All rights reserved.