在Spring Project Reactor中重启上游发布者时清除流中的运行中元素?

问题描述 投票:1回答:1

我有一个发布者,该发布者对MongoDB执行长时间运行的大型查询,并以Flux形式返回数据。在数据库中标记为“已处理”的实体将被过滤掉,然后对实体进行缓冲并传递给concatMap运算符(以便在处理下一个缓冲区中的元素之前先处理所有缓冲的≤元素)。看起来像这样:

Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
                   .buffer(10)
                   .concatMap(bufferedEntityList ->  
                                    Flux.fromIterable(bufferedEntityList)
                                        .flatMap(makeExternalCall)
                                        .then()));

makeExternalCall调用第三方远程服务器的地方在进行调用后将实体设置为processed。在大多数情况下,这可以正常工作,但是当远程服务器确实很慢或出现错误时,makeExternalCall将重试(以指数补偿)对远程服务器的操作。在某些情况下,可能需要一段时间才能处理完所有10个外部呼叫。实际上,可能需要很长时间才能重新启动myMongoRepository.executeLargeQuery()发布者,然后再次执行查询。现在我们遇到一个问题,我将在这里尝试描述:

  1. 实体A是从数据库中读取的(即,它以myMongoRepository.executeLargeQuery()产生的通量返回)。它尚未标记为“已处理”,这意味着entity.isProcessed()将返回false,并将其保留在流中。
  2. 外部服务器的速度确实很慢或很慢,因此makeExternalCall被迫重试操作之前实体A在数据库中已被标记为“已处理”。
  3. myMongoRepository.executeLargeQuery()重新启动,并再次执行查询。
  4. 实体A再次从数据库中读取。但是问题在于,实体A的另一个实例正在进行中,因为先前对myMongoRepository.executeLargeQuery()的调用尚未将该实体标记为“已处理”。
  5. 这意味着实体A将被两次调用makeExternalCall,这不是最佳方法!

我可以向数据库发出附加请求,并在processed方法中为每个实体检查makeExternalCall的状态,但这将给数据库造成额外的负载(因为每个实体都需要一个额外的请求)不是最佳的。

所以我的问题是:

[以某种方式“重新启动”整个流,从而在重新启动/重新执行由myMongoRepository.executeLargeQuery()触发的MongoDB查询时,清除中间缓冲区(即从正在进行的流中删除正在运行的实体A的方法) ?还是有更好的方法来解决这个问题?

我正在使用Spring Boot 2.2.4.RELEASE,项目反应堆3.3.2.RELEASEspring-boot-starter-data-mongodb-reactive 2.2.4.RELEASE

spring mongodb spring-boot reactive-programming project-reactor
1个回答
0
投票

不确定我是否完全理解问题。但是尝试回答听起来很有趣。

由于您需要了解makeExternalCall已经在处理的请求,您能否维护一个包含正在处理的实体的集合/本地缓存?

Set<Entity> inProgress = new HashSet<>(); 

Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();

entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
                   .buffer(10)
                   .map(bufferedEntityList -> {  // remove the inprogress requests to avoid redundant processing
                        bufferedEntityList.removeIf(inProgress::contains);
                        return bufferedEntityList;
                   })
                   .concatMap(bufferedEntityList ->  
                                    inProgress.addAll(bufferedEntityList);
                                    Flux.fromIterable(bufferedEntityList)
                                        .flatMap(makeExternalCall) //assuming once processed, it emits the entity object
                                        .map(entity -> {   //clear growing set
                                            inProgress.remove(entity);
                                            return entity;
                                        })
                                        .then()));

当您需要水平扩展应用程序时,此方法不是一个好的解决方案。在这种情况下,您可以使用redis之类的外部缓存服务器来代替维护本地缓存。

© www.soinside.com 2019 - 2024. All rights reserved.