我正在尝试序列化事件对象以生成 kafka 事件。 该项目的所有代码都可以在这里找到:
https://github.com/Gaboxondo/springBootCQRS示例
当我尝试发布事件 AccountOpenedEvent 时,出现以下错误:
ERROR 26516 --- [nio-8090-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.mashosoft.AccountCommand.domain.events.model.AccountOpenedEvent to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer] with root cause
java.lang.ClassCastException: class com.mashosoft.AccountCommand.domain.events.model.AccountOpenedEvent cannot be cast to class java.lang.String (com.mashosoft.AccountCommand.domain.events.model.AccountOpenedEvent is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
我已经阅读了很多有关此错误的内容,但我并不真正理解它。 我也看到了这篇文章,但说实话,我仍然不太明白这个问题 https://www.baeldung.com/java-classcastException
我知道 maily 是一个由于继承而产生的问题,也可能是 lombok,但仍然不知道如何解决它
课程如下:
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public class AccountOpenedEvent extends BaseEvent {
private String accountHolder;
private Date creationDate;
private Double openingBalance;
}
和基本事件抽象类:
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
public abstract class BaseEvent {
private String id;
private int version;
}
kafka的配置如下:
kafka:
bootstrap-servers: localhost:9092
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
正如我所说,所有代码都在 github 链接上并且是公开的。您可以运行 docker compose,然后就像运行主类一样简单:
您可以在发送创建请求时遇到序列化错误,然后当kafka spring尝试序列化对象时失败
非常感谢您提前提供的所有帮助。
更新:如果我手动进行kafka模板配置,它就可以工作
@Configuration
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<String, BaseEvent> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory(), true);
}
@Bean
public ProducerFactory<String, BaseEvent> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // Use JsonSerializer for complex objects
return new DefaultKafkaProducerFactory<>(configProps);
}
}
我仍然想知道我是否使用了错误的属性,或者我对属性做了什么错误