此线程是 Github 问题的延续:https://github.com/spring-projects/spring-data-r2dbc/issues/194
背景:
嗨,
我刚刚尝试了一个非常简单的示例,基于两个反应性存储库:
给出
br
,一个 r2dbc crud 存储库,以及 cr
,另一个 r2dbc crud 存储库:
br.findAll()
.flatMap(br -> {
return cr.findById(br.getPropertyOne())
.doOnNext(c -> br.setProperty2(c))
.thenReturn(br);
})
.collectList().block();
此代码示例永远不会完成(只有前 250 个左右的条目到达
.collectList
运算符)。经过一番挖掘,在 onBackpressureXXX
之后添加一些 findAll
运算符似乎可以通过......删除元素或缓冲它们来“解决”问题。
在这一点上,我的理解是 r2dbc 反应存储库不使用消费者反馈机制,这消除了 r2dbc 的很大一部分好处。
我错了吗?有没有更好的方法来实现同样的目标?
谢谢!
来自@mp911de的建议:
作为一般规则,避免在另一个流处于活动状态时创建流(名言:不要交叉流)。
如果您想获取相关数据,那么理想情况下将所有结果收集为列表,然后运行子查询。这样,初始响应流就会被消耗,并且连接可以自由地获取额外的结果。
像下面的代码片段应该可以完成这项工作:
br.findAll().collectList()
.flatMap(it -> {
List<Mono<Reference>> refs = new ArrayList<>();
for (Person p : it) {
refs.add(cr.findById(br.getPropertyOne()).doOnNext(…));
}
return Flux.concat(refs).thenReturn(it);
});
但这消除了流式传输数据而不将其全部保留在内存中的好处(我的最后一步不是列出,而是流式写入以输出到某个文件)。
这方面有什么帮助吗?
我们遇到了同样的问题,github 线程引导我走向正确的道路。除了寻找以下两件事之外,没有解决此问题的灵丹妙药:
尤其是最后一点不太好找。对于OP,它是一个连续提供数据的
findAll()
(流1)和一个flatMap
,它在读取每个发出的结果时同时修改数据(流2)。
就我而言,情况有所不同。路由原因是在 flatMap
内执行
flatMap
。当您传递
Publisher
作为参数时,可能很难找到它。假设您有以下代码:
@Transactional
public Mono<Render> saveInPostgres(final Long sequenceNumber,
final Mono<SomeComputeResult> previousOperations) {
previousOperations.flatMap(computeResult -> {
computeResult.updateSequenceNumber(sequenceNumber);
Render render = computeResult.finalize();
final Mono<Render> saved = renderRepository.save(entity);
...
}
我传递一个 Mono
,然后通过
flatMap
执行。更新序列号后,该方法将对象保存在存储库中。当然,导致
Mono
的所有操作现在都会在您映射时执行。你在这里看不到的是:
saveInPostgres
本身在
flatMap
中被调用
previousOperations.flatMap()
运行时出错,即异常
解决问题的提示
所以我的 2 美分,如果你有Publisher
作为参数,请将它们删除。始终链接映射执行,而不是传递它们并可能在流中创建流。这与OP问题本质上是一样的。通过调用
collectList()
,他能够链接操作,因为
findAll()
是在通过第二个
flatMap
对元素进行任何其他修改之前完成的。