向Saga发布事件时,没有实体事务的EntityManager

问题描述 投票:0回答:2

我正在使用“axon 4.0.3 + Spring Boot 2 + Spring Data(PostgreSQL)”默认配置。

将事件发布到EventStore并等待@SagaEventHandler捕获它后,我收到以下异常:

javax.persistence.TransactionRequiredException:没有可用于当前线程的实际事务的EntityManager - 无法在org.springframework.orm.jpa.SharedEntityManagerCreator $ SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:292)〜[spring-orm-]中可靠地处理'persist'调用5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.sun.proxy。$ Proxy104.persist(Unknown Source)〜[na:na] at java.util.stream.ForEachOps $ ForEachOp $ OfRef.accept( ForEachOps.java:184)~ [na:1.8.0_191] at java.util.stream.ReferencePipeline $ 3 $ 1.accept(ReferencePipeline.java:193)〜[na:1.8.0_191] at java.util.ArrayList $ ArrayListSpliterator。 forEachRemaining(ArrayList.java:1382)〜[na:1.8.0_191] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)〜[na:1.8.0_191] at java.util.stream.AbstractPipeline。在java.util.stream的java.util.stream.ForEachOps $ ForEachOp.evaluateSequential(ForEachOps.java:151)〜[na:1.8.0_191]中的wrapAndCopyInto(AbstractPipeline.java:471)〜[na:1.8.0_191]。 ForEachOps $ F orEachOp $ OfRef.evaluateSequential(ForEachOps.java:174)〜[na:1.8.0_191] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)〜[na:1.8.0_191] at java.util。在org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:276)〜[axon-eventsourcing-4.0.3]中的stream.ReferencePipeline.forEach(ReferencePipeline.java:418)〜[na:1.8.0_191] .jar:4.0.3] org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:98)〜[axon-eventsourcing-4.0.3.jar:4.0.3]

EventStore需要哪些其他配置来处理这种情况?

PS。在方法上添加@Transactional解决了这个问题,但我不明白为什么这是必要的。

最小代码示例(以下端点127.0.0.1:8080/1已启用,但另一个127.0.0.1:8080/1未启用):

@SpringBootApplication
class TestAxonApplication

class UserId(val userId: String = IdentifierFactory.getInstance().generateIdentifier()) : Serializable

class TestCommand(@TargetAggregateIdentifier val userId: UserId)

class TestedEvent(val userId: UserId)

fun main(args: Array<String>) {
    runApplication<TestAxonApplication>(*args)
}

@RestController
@RequestMapping
class Controller(var commandGateway: CommandGateway, var eventStore: EventStore) {

    @GetMapping("/1")
    fun done(): UserId? {
        return commandGateway.sendAndWait<UserId>(TestCommand(UserId()))
    }

    @GetMapping("/2")
    fun failure() {
        eventStore.publish(
                GenericEventMessage.asEventMessage<Void>(
                        TestedEvent(UserId())
                )
        )
    }

}

@Aggregate
class User() {

    @AggregateIdentifier
    private lateinit var userId: UserId

    @CommandHandler
    constructor(cmd: TestCommand) : this() {
        AggregateLifecycle.apply(TestedEvent(cmd.userId))
    }

    @EventHandler
    fun on(event: TestedEvent) {
        this.userId = event.userId
    }

}

@Saga
@ProcessingGroup("mySaga")
class MySaga {

    @StartSaga
    @SagaEventHandler(associationProperty = "userId")
    fun start(event: TestedEvent) {
        println("DONE ${event.userId.userId}")
    }

}
spring-boot jpa event-sourcing saga axon
2个回答
2
投票

调用之间的区别在于,一个通过命令总线,而另一个跳过它并直接发布到事件总线。默认情况下,在命令总线上配置TransactionManager。但是,事件总线并非如此。

这意味着您发布的事件没有事务处于活动状态。 Hibernate不喜欢这样。

解决方案是将@Transactional放在您的端点上,以确保在存储事件时事务处于活动状态。


0
投票

看起来您忘记设置命令总线以允许事务管理。像下面的示例一样添加它,它将工作:

@Bean
public CommandBus commandBus(TransactionManager transactionManager) {
    return new SimpleCommandBus(transactionManager, NoOpMessageMonitor.INSTANCE);
}

更新

我的错,首先我用来使用Axon 3.3(不是最新的),我以为你在为事件存储使用自定义配置。

Axon spring boot默认使用inmemory事件存储,这就是为什么如果你没有定义自定义事件,那么事务将无法工作。

这是您的配置中缺少的内容:

@Bean
public EventStorageEngine eventStorageEngine(EntityManagerProvider entityManagerProvider, TransactionManager transactionManager) {
    return new JpaEventStorageEngine(entityManagerProvider, transactionManager);
}

@Bean
public PlatformTransactionManager transactionManager(EntityManagerFactory emf) {
    return new JpaTransactionManager(emf);
}

´´´

这是我为其他提案做的一个简单示例,它完美无缺。

要牢记

另一个有趣的一点是,您必须定义聚合存储库,因为Axon会尝试查找与定义的聚合匹配的存储库。

@Bean
public Repository<User> documentAggregateRepository(EventStore eventStore) {
    return new EventSourcingRepository<>(User.class, eventStore);
}

对于传奇来说,你必须注册它们,否则传奇永远不会被触发

// remember to call the method sagaName + Configuration
// or you must set up the @Saga configurationBean name pointing this method
@Bean
public SagaConfiguration<MySaga> mySagaConfiguration() {
    return SagaConfiguration.subscribingSagaManager(MySaga.class);
}

在这个示例中是事件源,但如果您对遗留迁移感兴趣,可能会在迁移过程中使用GenericJpaRepository(如果您需要一个示例,请告诉我)。

HTH。

© www.soinside.com 2019 - 2024. All rights reserved.