我正在使用“axon 4.0.3 + Spring Boot 2 + Spring Data(PostgreSQL)”默认配置。
将事件发布到EventStore并等待@SagaEventHandler捕获它后,我收到以下异常:
javax.persistence.TransactionRequiredException:没有可用于当前线程的实际事务的EntityManager - 无法在org.springframework.orm.jpa.SharedEntityManagerCreator $ SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:292)〜[spring-orm-]中可靠地处理'persist'调用5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.sun.proxy。$ Proxy104.persist(Unknown Source)〜[na:na] at java.util.stream.ForEachOps $ ForEachOp $ OfRef.accept( ForEachOps.java:184)~ [na:1.8.0_191] at java.util.stream.ReferencePipeline $ 3 $ 1.accept(ReferencePipeline.java:193)〜[na:1.8.0_191] at java.util.ArrayList $ ArrayListSpliterator。 forEachRemaining(ArrayList.java:1382)〜[na:1.8.0_191] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)〜[na:1.8.0_191] at java.util.stream.AbstractPipeline。在java.util.stream的java.util.stream.ForEachOps $ ForEachOp.evaluateSequential(ForEachOps.java:151)〜[na:1.8.0_191]中的wrapAndCopyInto(AbstractPipeline.java:471)〜[na:1.8.0_191]。 ForEachOps $ F orEachOp $ OfRef.evaluateSequential(ForEachOps.java:174)〜[na:1.8.0_191] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)〜[na:1.8.0_191] at java.util。在org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276)〜[axon-eventsourcing-4.0.3]中的stream.ReferencePipeline.forEach(ReferencePipeline.java:418)〜[na:1.8.0_191] .jar:4.0.3] org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98)〜[axon-eventsourcing-4.0.3.jar:4.0.3]
EventStore需要哪些其他配置来处理这种情况?
PS。在方法上添加@Transactional解决了这个问题,但我不明白为什么这是必要的。
最小代码示例(以下端点127.0.0.1:8080/1已启用,但另一个127.0.0.1:8080/1未启用):
@SpringBootApplication
class TestAxonApplication
class UserId(val userId: String = IdentifierFactory.getInstance().generateIdentifier()) : Serializable
class TestCommand(@TargetAggregateIdentifier val userId: UserId)
class TestedEvent(val userId: UserId)
fun main(args: Array<String>) {
runApplication<TestAxonApplication>(*args)
}
@RestController
@RequestMapping
class Controller(var commandGateway: CommandGateway, var eventStore: EventStore) {
@GetMapping("/1")
fun done(): UserId? {
return commandGateway.sendAndWait<UserId>(TestCommand(UserId()))
}
@GetMapping("/2")
fun failure() {
eventStore.publish(
GenericEventMessage.asEventMessage<Void>(
TestedEvent(UserId())
)
)
}
}
@Aggregate
class User() {
@AggregateIdentifier
private lateinit var userId: UserId
@CommandHandler
constructor(cmd: TestCommand) : this() {
AggregateLifecycle.apply(TestedEvent(cmd.userId))
}
@EventHandler
fun on(event: TestedEvent) {
this.userId = event.userId
}
}
@Saga
@ProcessingGroup("mySaga")
class MySaga {
@StartSaga
@SagaEventHandler(associationProperty = "userId")
fun start(event: TestedEvent) {
println("DONE ${event.userId.userId}")
}
}
调用之间的区别在于,一个通过命令总线,而另一个跳过它并直接发布到事件总线。默认情况下,在命令总线上配置TransactionManager
。但是,事件总线并非如此。
这意味着您发布的事件没有事务处于活动状态。 Hibernate不喜欢这样。
解决方案是将@Transactional
放在您的端点上,以确保在存储事件时事务处于活动状态。
看起来您忘记设置命令总线以允许事务管理。像下面的示例一样添加它,它将工作:
@Bean
public CommandBus commandBus(TransactionManager transactionManager) {
return new SimpleCommandBus(transactionManager, NoOpMessageMonitor.INSTANCE);
}
更新
我的错,首先我用来使用Axon 3.3(不是最新的),我以为你在为事件存储使用自定义配置。
Axon spring boot默认使用inmemory事件存储,这就是为什么如果你没有定义自定义事件,那么事务将无法工作。
这是您的配置中缺少的内容:
@Bean
public EventStorageEngine eventStorageEngine(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
return new JpaEventStorageEngine(entityManagerProvider, transactionManager);
}
@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory emf) {
return new JpaTransactionManager(emf);
}
´´´
这是我为其他提案做的一个简单示例,它完美无缺。
要牢记
另一个有趣的一点是,您必须定义聚合存储库,因为Axon会尝试查找与定义的聚合匹配的存储库。
@Bean
public Repository<User> documentAggregateRepository(EventStore eventStore) {
return new EventSourcingRepository<>(User.class, eventStore);
}
对于传奇来说,你必须注册它们,否则传奇永远不会被触发
// remember to call the method sagaName + Configuration
// or you must set up the @Saga configurationBean name pointing this method
@Bean
public SagaConfiguration<MySaga> mySagaConfiguration() {
return SagaConfiguration.subscribingSagaManager(MySaga.class);
}
在这个示例中是事件源,但如果您对遗留迁移感兴趣,可能会在迁移过程中使用GenericJpaRepository(如果您需要一个示例,请告诉我)。
HTH。