我正在尝试实施轮询机制。我想根据某些条件增加或减少轮询间隔。我正在将Mono.repeat与delayElements配合使用,以一定的间隔执行重复任务,但是我无法找到一种基于某些条件修改延迟的方法。
Mono.just(1).
repeat().
delayElements(getPollingInterval()).
takeUntil((s)->
{
if(checkForEndCriteria()){
log.info("Critera to end reached);
return true;
}
return false;
}).
log().
subscribeOn(Schedulers.boundedElastic()).
flatMapSequential(x -> {
List<Event> eventList = getEvents(id, lastItemTimeStamp);;
if (!eventList.isEmpty()) {
//Recieving events now. So want to decrease the interval.
return Flux.fromIterable(eventList);
} else {
//There are no events happening .So I would like
//to increase the delay of repeat task by 1 sec
return Flux.just(buildHeartBeatEvent());
}
}).
onErrorResume(error -> {
log.error("Error occurred", error);
return Flux.error(error);
});```
我用Flux.<Duration>generate()
实现了此功能:
Flux
.<Duration>generate(sink -> {
Date date = [NEXT_DATE];
if (date != null) {
long millis = date.getTime() - System.currentTimeMillis();
sink.next(Duration.ofMillis(millis));
}
else {
sink.complete();
}
})
.concatMap(duration ->
Mono.delay(duration)
...
)
.repeat();
因此,每次我们以generate()
返回repeat()
时,我们都可以查看某种状态以获得下一个执行Date
。