包括数据依赖:
implementation("io.micronaut.data:micronaut-data-mongodb")
implementation("io.micronaut.mongodb:micronaut-mongo-reactive")
implementation("io.micronaut.reactor:micronaut-reactor")
runtimeOnly("org.mongodb:mongodb-driver-reactivestreams")
存储库接口:
@MongoRepository
public interface DepartmentRepository extends ReactiveStreamsPageableRepository<Department, String> {
}
@MongoRepository
public interface OrganisationRepository extends ReactiveStreamsPageableRepository<Organisation, String> {
}
OOTB 类,我在其中找到与事务相关的实现:
io.micronaut.data.mongodb.operations.DefaultReactiveMongoRepositoryOperations
交易服务方式:
在下面的方法中,我添加了
@Transactional
注释。在该方法中,我调用了两个存储库 update
操作。为了将它们结合起来,我正在使用 zip
订阅它们的完成和返回。
@Override
@Transactional
public Mono<Boolean> updateCounts( final MessageDto message ) {
// do something
// update department reactive
final Mono<Department> departmentUpdate = Mono.from( departmentRepository.findById( departmentId ) )
.flatMap( department -> {
// do something
return Mono.from( departmentRepository.update( department ) );
} );
log.info( "Department update mono created" );
// update organization reactive
final Mono<Organisation> organizationUpdate = Mono.from( organisationRepository.findById( orgId ) )
.flatMap( org -> {
// do something
return Mono.from( organisationRepository.update( org ) );
} );
log.info( "Org update mono created" );
// zip and respond
return Mono.zip( departmentUpdate, organizationUpdate ).map( tuple -> {
return Boolean.TRUE;
} );
例外:
11:22:22.635 [Thread-10] WARN i.m.d.m.o.DefaultReactiveMongoRepositoryOperations - Rolling back transaction on error: Query failed with error code 251 and error message 'Given transaction number 2 does not match any in-progress transactions. The active transaction number is 1' on server localhost:27017
com.mongodb.MongoQueryException: Query failed with error code 251 and error message 'Given transaction number 2 does not match any in-progress transactions. The active transaction number is 1' on server localhost:27017
at com.mongodb.internal.operation.FindOperation$2.onResult(FindOperation.java:787)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$transformingReadCallback$10(CommandOperationHelper.java:322)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:272)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:82)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:683)
at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:159)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:523)
at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:498)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:821)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:785)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:645)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:642)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:118)
at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:107)
at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:642)
at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:86)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:775)
at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:760)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:645)
at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:642)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)
at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:221)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
从异常来看,即使应用程序事务处于活动状态,它看起来事务正在 MongoDB 端关闭。可能是因为
ClientSession
的MongoDB在第一次更新后就关闭了但是我不清楚如何解决这个问题。
有人试过这个吗?Micronaut Mongo Data Reactive Streams (Reactor Project) 也支持事务吗?