Micronaut 数据 MonogDB 反应式事务

问题描述 投票:0回答:0

包括数据依赖:

implementation("io.micronaut.data:micronaut-data-mongodb")
implementation("io.micronaut.mongodb:micronaut-mongo-reactive")
implementation("io.micronaut.reactor:micronaut-reactor")
runtimeOnly("org.mongodb:mongodb-driver-reactivestreams")

存储库接口:

@MongoRepository
public interface DepartmentRepository extends ReactiveStreamsPageableRepository<Department, String> {

}
@MongoRepository
public interface OrganisationRepository extends ReactiveStreamsPageableRepository<Organisation, String> {

}

OOTB 类,我在其中找到与事务相关的实现:

io.micronaut.data.mongodb.operations.DefaultReactiveMongoRepositoryOperations

交易服务方式:

在下面的方法中,我添加了

@Transactional
注释。在该方法中,我调用了两个存储库
update
操作。为了将它们结合起来,我正在使用
zip
订阅它们的完成和返回。

@Override
  @Transactional
  public Mono<Boolean> updateCounts( final MessageDto message ) {
    // do something

    // update department reactive
    final Mono<Department> departmentUpdate = Mono.from( departmentRepository.findById( departmentId ) )
        .flatMap( department -> {
          // do something
          return Mono.from( departmentRepository.update( department ) );
        } );
    log.info( "Department update mono created" );

    // update organization reactive
    final Mono<Organisation> organizationUpdate = Mono.from( organisationRepository.findById( orgId ) )
        .flatMap( org -> {
          // do something
          return Mono.from( organisationRepository.update( org ) );
        } );
    log.info( "Org update mono created" );

    // zip and respond
    return Mono.zip( departmentUpdate, organizationUpdate ).map( tuple -> {
      return Boolean.TRUE;
    } );

例外:

11:22:22.635 [Thread-10] WARN  i.m.d.m.o.DefaultReactiveMongoRepositoryOperations - Rolling back transaction on error: Query failed with error code 251 and error message 'Given transaction number 2 does not match any in-progress transactions. The active transaction number is 1' on server localhost:27017
com.mongodb.MongoQueryException: Query failed with error code 251 and error message 'Given transaction number 2 does not match any in-progress transactions. The active transaction number is 1' on server localhost:27017
    at com.mongodb.internal.operation.FindOperation$2.onResult(FindOperation.java:787)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$transformingReadCallback$10(CommandOperationHelper.java:322)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:272)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:82)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:683)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:159)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:48)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:523)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:498)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:821)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:785)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:645)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:642)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)
    at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:160)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:573)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:276)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:297)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:144)
    at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:118)
    at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:107)
    at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:642)
    at com.mongodb.internal.connection.InternalStreamConnection.access$600(InternalStreamConnection.java:86)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:775)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:760)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:645)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:642)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:250)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:233)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:129)
    at java.base/sun.nio.ch.Invoker$2.run(Invoker.java:221)
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:113)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

从异常来看,即使应用程序事务处于活动状态,它看起来事务正在 MongoDB 端关闭。可能是因为

ClientSession
的MongoDB在第一次更新后就关闭了但是我不清楚如何解决这个问题。

有人试过这个吗?Micronaut Mongo Data Reactive Streams (Reactor Project) 也支持事务吗?

mongodb reactive-programming project-reactor micronaut micronaut-data
© www.soinside.com 2019 - 2024. All rights reserved.