我正在尝试使用反应式 r2dbc 创建批处理并使用事务来注释该方法。 但看起来如果我同时使用
@Transactional
和批处理代码,代码就会挂起并且不起作用。
以下是代码:
@Transactional
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /* **Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /* **Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /* **Expect count batch***/
.verifyComplete(); /* **Verify batch***/
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
return null;
}
你正在破坏反应链。
在反应式编程中在您订阅之前什么都不会发生。
那么这意味着什么,我可以用一个小例子来展示它。
// If running this, nothing happens
Mono.just("Foobar");
同时:
Mono.just("Foobar").subscribe(s -> System.out.println(s));
将打印:
Foobar
如果您有函数,这也适用
public void getString() {
Mono.just("Foobar");
}
// Nothing happens, you have declared something
// but it will never get run, no one is subscribing
getString();
你需要做什么:
public Mono<String> getString() {
// This could be saving to a database or anything, this will now get run
return Mono.just("Now this code will get run");
}
// The above got run, we can prove it by printing
getString().subscribe(s -> System.out.println(s));
那么到底发生了什么?好吧,在反应式编程中,一旦有人订阅了 Mono 或 Flux,程序就会向上遍历并构建回调链,直到找到开始生成值的生产者(在我的例子中是 just 语句)。这个阶段称为“组装阶段”。此阶段完成后,反应链将开始为订阅者产生价值。
如果没有人订阅,则不会建立任何链。
那么订阅者是谁?它通常是价值的最终消费者。例如,发起呼叫的网页或移动应用程序,但如果它是发起呼叫的服务(例如在 cron 作业中),也可以是您的 Spring Boot 服务。
让我们看看你的代码:
@Transactional /* **Transaction** */
@GetMapping("/batchFetchData")
public Flux<Object> batchFetch() {
long startTime = System.currentTimeMillis();
// Here you declare a Mono but ignoring the return type so breaking the reactive chain
Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch() /* **Creating batch***/
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
//.execute())); /* **Execution batch***/
.execute())).as(StepVerifier::create)
.expectNextCount(3) /* **Expect count batch***/
.verifyComplete(); /* **Verify batch***/
// Here at the end you have no subscriber
LOGGER.info("Time taken to batchFetch "+(System.currentTimeMillis() - startTime));
// Null is not allowed in reactive chains
return null;
}
那么如何解决呢?
你需要不要破坏反应链。这是基本的反应式编程。
@Transactional
@GetMapping("/batchFetchData")
public Mono<Void> batchFetch() {
long startTime = System.currentTimeMillis();
// we return here so that the calling client
// can subscribe and start the chain
return Mono.from(databaseConfiguration.connectionFactory().create())
.flatMapMany(connection -> Flux.from(connection
.createBatch()
.add("SELECT * FROM xtable where xId = 232323")
.add("SELECT * FROM ytable where yId = 454545")
.add("SELECT * FROM ztable where zId = 676767")
.execute()))
.then();
// then() statement throws away whatever the return
// value is and just signals to the calling client
// when everything is done.
}
“我不想退回任何东西”
这就是
Mono#then
声明的用途。您会看到,当链中的每个部分完成时,它都会发出完成信号,然后将值从一个部分传递到下一个部分,然后再次发出信号,并传递值等等。当我们到达 then
语句时,它只会发出信号COMPLETE 并且不返回任何内容(或者实际上它返回一个 Mono<Void>
,因为反应链中不允许 null)。您必须始终返回,以便每个步骤都可以传递其 COMPLETE 信号。
我还删除了您在代码中使用的 StepVerifier,因为它通常用于验证单元测试中的步骤,而不是在生产代码中使用。您可以在这里阅读更多相关信息StepVerifier。
如果你想学习反应式编程,我建议你这样做,因为它太神奇了,我喜欢它,我强烈建议你阅读优秀的反应器文档反应式编程简介,他们将解释以下概念:什么都不会发生,直到您订阅等
您的问题是:
return null;
您应该在反应式应用程序中返回 Mono/Flux,即使流程中没有项目,也应返回
Mono.empty()
/Flux.empty()
。
查看我的示例插入多条记录。
测试时,使用StepVerify来验证结果。
对于WebFlux应用程序中的事务支持,您必须阅读相关文档以检查它是否像一般本地事务一样受到良好支持或使用它时的限制。
如果支持良好,有两种使用交易的方法。
TransactionalOperator
(类似于传统的TransactionTemplate
)来包装你的业务逻辑。@Transaction
注释。