Quarkus 反应式风格如何在非 REST 端点中使用 @ReactiveTransactional 注释

问题描述 投票:0回答:1

我有一个定期批量接收消息的服务,然后对每条消息执行一个事务。

这是示例代码:

// scheduled to run every 5 second
private Uni<Void> receiveMessage() {
    Uni<List<Message>> uniListMessage = messageService.get();
    
    return uniListMessage
            .map(listMessage -> {
                final UniJoin.Builder<Void> uniBuilder = Uni.join().builder();
                listMessage.forEach(message -> {
                    Uni<Void> uniOperation = businessService.doOperation(message);
                    uniBuilder.add(uniOperation);
                });
                
                return uniBuilder.joinAll().andCollectFailures()
                        .replaceWithVoid();
            });
}

BusinessService

@ReactiveTransactional
public Uni<Void> doOperation(Message message) {
    // do find query, insert, update, etc to multiple tables
    Uni<Fruit> uniFruit = fruitRepo.findByMessage(message);
    
    return uniFruit
            .map(fruit -> {
                // calculate things
                final UniJoin.Builder<Void> uniBuilder = Uni.join().builder();
                
                Uni<Apple> uniApple = appleRepo.doUpdate(fruit);
                Uni<Melon> uniMelon = melonRepo.doUpdate(fruit);

                return uniJoinBuilder.joinAll().andFailFast();
            });
}

appleRepo.doUpdate()
melonRepo.doUpdate
没有
@ReactiveTransactional
注释,只使用
.persist()
.update()
PanacheRepositoryBase

我知道他们建议我们将

@ReactiveTransactional
放在 REST 端点控制器上,但我没有使用 REST,所以我不确定如何正确执行此操作

我运行一些测试,如果我只提交 1 条消息,一切正常

但是,如果我有大约 100 条消息,事情就会开始变得奇怪,例如我收到:

NoStackTraceThrowable: Transaction already completed
java.lang.IllegalStateException: HR000061: Session is currently connecting to database
io.vertx.mysqlclient.MySQLBatchException: Error occurs during batch execution

而且很少

index out of bound 1 out of 0

我之前使用过 classic-orm,但遇到了麻烦,因为 @Transactional 不能用于返回的函数

Uni

java quarkus quarkus-panache mutiny hibernate-reactive
1个回答
0
投票

会话不能在不同的管道中并行使用。 操作的顺序必须以可预测的方式发生。 这意味着您无法使用

.joinAll

您还使用

.map
而不是
.call
(或 .chain)。

这应该有效:

// scheduled to run every 5 second
private Uni<Void> receiveMessage() {  
    return messageService.get()
            // You have to use .call or .chain here (not .map)
            .call(listMessage -> {
                Uni<Void> uniResult = Uni.createFrom().voidItem();
                for (Message message : listMessage ) {
                       uniResult = uniResult.call(businessService::doOperation);
                }
                return uniResult;
            });
}
© www.soinside.com 2019 - 2024. All rights reserved.