Quarkus Panache Transactional 如何正确关闭数据库连接

问题描述 投票:0回答:1

在并行处理大量交易时,我在 Panache Quarkus 中遇到交易问题

这是我的示例代码

public Uni<Void> executeLotsOfTransactions() {
    Multi<Integer> multi = Multi.createFrom().range(0, 1000);

    return multi
            .onItem().transformToUniAndMerge(number -> {
                return Uni.createFrom()
                        .deferred(() -> executeSomeTransaction(number))
                        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
                        .emitOn(MutinyHelper.executor(Vertx.currentContext()));
            })
            .collect().asList()
            .replaceWithVoid();
}

@Transactional
Uni<Void> executeSomeTransaction(Integer num) {
    Uni<Item> uniItem = someRepo.findSomeData(num); // basically find()
    return uniItem
            .chain(item -> {
                // do some calculation & update with item
                return someRepo.updateItem(item); // basically persist() or update() item
            })
            .onFailure().retry().withBackOff(Duration.ofSeconds(5)).atMost(3);
}

请注意,我使用

.deferred()
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
,因此 JTA 在工作池中启动

一些

someRepo.updateItem(item)
是成功的,但其中许多也因以下异常而失败:

javax.persistence.PersistenceException: org.hibernate.exception.GenericJDBCException: Unable to acquire JDBC Connection

这是回购代码:

@Override
public Uni<Void> updateItem(Item item) {
    return Uni.createFrom()
            .voidItem().invoke(() -> updateItemNonUni(item))
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

@Transactional(REQUIRED)
void updateItemNonUni(Item item) {
    // this approach is not optimal, should rework
    Item itemFromDB = findById(item.getID());
    itemFromDB.setBalance(item.getBalance());
    itemFromDB.setStatus(item.getStatus());
    itemFromDB.setUpdatedDate(LocalDateTime.now());
    persist(itemFromDB);
}

我想一开始是因为我并行进行交易,所以我添加了重试

.onFailure()
但这似乎没有帮助,因为失败操作的数量并没有减少

所以我认为这是因为来自另一个

executeSomeTransaction
的连接尚未关闭,即使它已经完成了。

我认为这与 Uni 的工作方式有关,但我不能 100% 确定我在哪里做错了。

所以我的问题是,如何正确关闭数据库连接?

编辑: 我正在使用非反应式 Hibernate-ORM 和 Quarkus 版本 2.16

hibernate transactions quarkus quarkus-panache
1个回答
0
投票

根据@ozkanpakdil的评论,我尝试限制插入的并发性,现在它可以工作了。

这就是大致的解决方案

public Uni<Void> insertAmounts(List<SomeEntity> itemList) {
    return Multi.createFrom().iterable(ListUtils.partition(itemList, 2000))
            .emitOn(Infrastructure.getDefaultWorkerPool())
            .onItem().transformToUni(batchedList -> Uni.createFrom().voidItem()
                    .invoke(() -> InsertManyItemsNonUni(batchedList))
            )
            .merge(50)
            .collect().asList()
            .replaceWithVoid()
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

void InsertManyItemsNonUni(List<SomeEntity> itemList) {
    if (QuarkusTransaction.isActive()) {
        QuarkusTransaction.joiningExisting();
    } else {
        QuarkusTransaction.begin();
    }
    repositoryWriter.persist(itemList);
    QuarkusTransaction.commit();
}

注意:我的最大连接池是 100

© www.soinside.com 2019 - 2024. All rights reserved.