Spring Data R2DBC - 未考虑背压?

问题描述 投票:0回答:1

此线程是 Github 问题的延续:https://github.com/spring-projects/spring-data-r2dbc/issues/194

背景:

嗨,

我刚刚尝试了一个非常简单的示例,基于两个反应性存储库:

给出

br
,一个 r2dbc crud 存储库,以及
cr
,另一个 r2dbc crud 存储库:

br.findAll()
   .flatMap(br -> {
      return cr.findById(br.getPropertyOne())
               .doOnNext(c -> br.setProperty2(c))
               .thenReturn(br);
    })
   .collectList().block();

此代码示例永远不会完成(只有前 250 个左右的条目到达

.collectList
运算符)。经过一番挖掘,在
onBackpressureXXX
之后添加一些
findAll
运算符似乎可以通过......删除元素或缓冲它们来“解决”问题。

在这一点上,我的理解是 r2dbc 反应存储库不使用消费者反馈机制,这消除了 r2dbc 的很大一部分好处。

我错了吗?有没有更好的方法来实现同样的目标?

谢谢!


来自@mp911de的建议:

作为一般规则,避免在另一个流处于活动状态时创建流(名言:不要交叉流)。

如果您想获取相关数据,那么理想情况下将所有结果收集为列表,然后运行子查询。这样,初始响应流就会被消耗,并且连接可以自由地获取额外的结果。

像下面的代码片段应该可以完成这项工作:

br.findAll().collectList()
        .flatMap(it -> {

            List<Mono<Reference>> refs = new ArrayList<>();
            for (Person p : it) {
                 refs.add(cr.findById(br.getPropertyOne()).doOnNext(…));
            }

            return Flux.concat(refs).thenReturn(it);
        });

但这消除了流式传输数据而不将其全部保留在内存中的好处(我的最后一步不是列出,而是流式写入以输出到某个文件)。

这方面有什么帮助吗?

spring-data project-reactor r2dbc spring-data-r2dbc
1个回答
0
投票

我们遇到了同样的问题,github 线程引导我走向正确的道路。除了寻找以下两件事之外,没有解决此问题的灵丹妙药:

  1. 在事务内操作的任何流
  2. 在同一范围内运行的多个流

尤其是最后一点不太好找。对于OP,它是一个连续提供数据的

findAll()
(流1)和一个
flatMap
,它在读取每个发出的结果时同时修改数据(流2)。

就我而言,情况有所不同。路由原因是在

flatMap

 内执行 
flatMap
。当您传递 
Publisher
 作为参数时,可能很难找到它。假设您有以下代码:

@Transactional public Mono<Render> saveInPostgres(final Long sequenceNumber, final Mono<SomeComputeResult> previousOperations) { previousOperations.flatMap(computeResult -> { computeResult.updateSequenceNumber(sequenceNumber); Render render = computeResult.finalize(); final Mono<Render> saved = renderRepository.save(entity); ... }
我传递一个 

Mono

,然后通过 
flatMap
 执行。更新序列号后,该方法将对象保存在存储库中。当然,导致 
Mono
 的所有操作现在都会在您映射时执行。你在这里看不到的是:

  1. saveInPostgres
     本身在 
    flatMap
     中被调用
    
  2. previousOperations.flatMap()
     运行时出错,即异常
这种行为对我们来说特别痛苦,因为异常发生的时候,数据库上有锁,并且锁不断累积并耗尽我们的连接池,直到服务无响应。

解决问题的提示

所以我的 2 美分,如果你有

Publisher

 作为参数,请将它们删除。始终链接映射执行,而不是传递它们并可能在流中创建流。这与OP问题本质上是一样的。通过调用 
collectList()
,他能够链接操作,因为 
findAll()
 是在通过第二个 
flatMap
 对元素进行任何其他修改之前完成的。

© www.soinside.com 2019 - 2024. All rights reserved.