我有一个发布者,该发布者对MongoDB执行长时间运行的大型查询,并以Flux形式返回数据。在数据库中标记为“已处理”的实体将被过滤掉,然后对实体进行缓冲并传递给concatMap运算符(以便在处理下一个缓冲区中的元素之前先处理所有缓冲的≤元素)。看起来像这样:
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.concatMap(bufferedEntityList ->
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall)
.then()));
在makeExternalCall
调用第三方远程服务器的地方和在进行调用后将实体设置为processed
。在大多数情况下,这可以正常工作,但是当远程服务器确实很慢或出现错误时,makeExternalCall
将重试(以指数补偿)对远程服务器的操作。在某些情况下,可能需要一段时间才能处理完所有10个外部呼叫。实际上,可能需要很长时间才能重新启动myMongoRepository.executeLargeQuery()
发布者,然后再次执行查询。现在我们遇到一个问题,我将在这里尝试描述:
myMongoRepository.executeLargeQuery()
产生的通量返回)。它尚未标记为“已处理”,这意味着entity.isProcessed()
将返回false
,并将其保留在流中。makeExternalCall
被迫重试操作之前实体A在数据库中已被标记为“已处理”。myMongoRepository.executeLargeQuery()
重新启动,并再次执行查询。 myMongoRepository.executeLargeQuery()
的调用尚未将该实体标记为“已处理”。makeExternalCall
,这不是最佳方法!我可以向数据库发出附加请求,并在processed
方法中为每个实体检查makeExternalCall
的状态,但这将给数据库造成额外的负载(因为每个实体都需要一个额外的请求)不是最佳的。
所以我的问题是:
[以某种方式“重新启动”整个流,从而在重新启动/重新执行由myMongoRepository.executeLargeQuery()
触发的MongoDB查询时,清除中间缓冲区(即从正在进行的流中删除正在运行的实体A的方法) ?还是有更好的方法来解决这个问题?
我正在使用Spring Boot 2.2.4.RELEASE
,项目反应堆3.3.2.RELEASE
和spring-boot-starter-data-mongodb-reactive
2.2.4.RELEASE
。
不确定我是否完全理解问题。但是尝试回答听起来很有趣。
由于您需要了解makeExternalCall
已经在处理的请求,您能否维护一个包含正在处理的实体的集合/本地缓存?
Set<Entity> inProgress = new HashSet<>();
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.map(bufferedEntityList -> { // remove the inprogress requests to avoid redundant processing
bufferedEntityList.removeIf(inProgress::contains);
return bufferedEntityList;
})
.concatMap(bufferedEntityList ->
inProgress.addAll(bufferedEntityList);
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall) //assuming once processed, it emits the entity object
.map(entity -> { //clear growing set
inProgress.remove(entity);
return entity;
})
.then()));
当您需要水平扩展应用程序时,此方法不是一个好的解决方案。在这种情况下,您可以使用redis
之类的外部缓存服务器来代替维护本地缓存。