我有一个定期批量接收消息的服务,然后对每条消息执行一个事务。
这是示例代码:
// scheduled to run every 5 second
private Uni<Void> receiveMessage() {
Uni<List<Message>> uniListMessage = messageService.get();
return uniListMessage
.map(listMessage -> {
final UniJoin.Builder<Void> uniBuilder = Uni.join().builder();
listMessage.forEach(message -> {
Uni<Void> uniOperation = businessService.doOperation(message);
uniBuilder.add(uniOperation);
});
return uniBuilder.joinAll().andCollectFailures()
.replaceWithVoid();
});
}
在
BusinessService
:
@ReactiveTransactional
public Uni<Void> doOperation(Message message) {
// do find query, insert, update, etc to multiple tables
Uni<Fruit> uniFruit = fruitRepo.findByMessage(message);
return uniFruit
.map(fruit -> {
// calculate things
final UniJoin.Builder<Void> uniBuilder = Uni.join().builder();
Uni<Apple> uniApple = appleRepo.doUpdate(fruit);
Uni<Melon> uniMelon = melonRepo.doUpdate(fruit);
return uniJoinBuilder.joinAll().andFailFast();
});
}
appleRepo.doUpdate()
和melonRepo.doUpdate
没有@ReactiveTransactional
注释,只使用.persist()
做
.update()
或
PanacheRepositoryBase
我知道他们建议我们将
@ReactiveTransactional
放在 REST 端点控制器上,但我没有使用 REST,所以我不确定如何正确执行此操作
我运行一些测试,如果我只提交 1 条消息,一切正常
但是,如果我有大约 100 条消息,事情就会开始变得奇怪,例如我收到:
NoStackTraceThrowable: Transaction already completed
java.lang.IllegalStateException: HR000061: Session is currently connecting to database
io.vertx.mysqlclient.MySQLBatchException: Error occurs during batch execution
而且很少
index out of bound 1 out of 0
我之前使用过 classic-orm,但遇到了麻烦,因为 @Transactional 不能用于返回的函数
Uni
会话不能在不同的管道中并行使用。 操作的顺序必须以可预测的方式发生。 这意味着您无法使用
.joinAll
。
您还使用
.map
而不是 .call
(或 .chain)。
这应该有效:
// scheduled to run every 5 second
private Uni<Void> receiveMessage() {
return messageService.get()
// You have to use .call or .chain here (not .map)
.call(listMessage -> {
Uni<Void> uniResult = Uni.createFrom().voidItem();
for (Message message : listMessage ) {
uniResult = uniResult.call(businessService::doOperation);
}
return uniResult;
});
}