我正在做一个基于事件的系统,它由通过某种领域事件进行通信的多个服务组成。所有服务都在 Spring Boot 3.1.3 上,并通过 activemq artemis 发送消息。
我有一些接口和通用代码来统一所有服务的行为:
interface DomainEvent
data class DomainEventContainer<D : DomainEvent>(
val event: D,
val type: DomainEventType,
val createdBy: DomainEventOrigin,
val createdAt: Instant
)
fun interface BaseDomainEventProducer<T : DomainEvent> {
fun produce(event: T): DomainEventContainer<T>?
}
fun interface BaseDomainEventConsumer<T : DomainEvent> {
fun consume(container: DomainEventContainer<T>)
}
在制作人方面,我这样做:
fun produce(event: DomainEvent) {
// some logic here
val container = DomainEventContainer(event, type, DomainEventOrigin.MY_SERVICE, Instant.now())
return try {
log.info("Sending event $container")
jmsTemplate.convertAndSend(destination, container)
container
} catch (e: Exception) {
log.info("Error during sending event $container ", e)
null
}
}
在接收方,我正在尝试这样做:
@JmsListener(destination = "\${random.artemis.queue}")
override fun consume(message: DomainEventContainer<MyCustomDomainEvent>) {
eventHandler.handle(container.event)
}
问题是 jms 内部没有足够的信息来猜测
MyCustomDomainEvent
类型,并且它无法为我的 jms 监听器取消实现这样的消息,因为错误而失败:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `DomainEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
我可以使用
jakarta.jms.Message
自己对消息进行反现实化,如下所示:
@JmsListener(destination = "\${random.artemis.queue}")
override fun consume(message: Message) {
val container = objectMapper.readValue(
message.getBody(String::class.java),
object : TypeReference<DomainEventContainer<EmploymentRegisteredDomainEvent>>(){})
)
eventHandler.handle(container.event)
}
或者像这样使用
org.springframework.messaging.Message
:
@JmsListener(destination = "\${random.artemis.queue}")
override fun consume(message: Message<DomainEventContainer<MyCustomDomain>>) {
val container = message.payload
eventHandler.handle(container.event)
}
但是所有这些选项对我来说看起来都很奇怪,因为我必须向每个侦听器添加转换逻辑,这应该是平台代码所关心的问题。 我错过了什么吗?我可以配置 spring 按照我需要的方式反序列化消息吗?
附注我记得 Spring Boot 的 Rabbit/amqp 部分完全按照我需要的方式工作,因此我们可以使用
DomainEventContainer<MyCustomDomain>
和 RabbitTemplate
发送我的消息,并期望像这样收听它:
@RabbitListener(queues = ["random.rabbit.queue"])
fun consume(event: DomainEvent<MyCustomDomain>)
而且效果很好。不知道为什么它在 spring 的 jms 内部以这种方式工作。
您需要在
MappingJackson2MessageConverter
和 JmsTemplate
上配置 AbstractJmsListenerContainerFactory
。并将其 typeIdPropertyName
设置为您键入的内容以分别正确序列化和反序列化。