控制对Mono执行重复任务的延迟

问题描述 投票:0回答:1

我正在尝试实施轮询机制。我想根据某些条件增加或减少轮询间隔。我正在将Mono.repeat与delayElements配合使用,以一定的间隔执行重复任务,但是我无法找到一种基于某些条件修改延迟的方法。

Mono.just(1).
    repeat().
    delayElements(getPollingInterval()).
    takeUntil((s)->
      {

          if(checkForEndCriteria()){
              log.info("Critera to end reached);
              return true;
          }
          return false;
      }).
    log().
    subscribeOn(Schedulers.boundedElastic()).
    flatMapSequential(x -> {
        List<Event> eventList = getEvents(id, lastItemTimeStamp);;
        if (!eventList.isEmpty()) {
            //Recieving events now. So want to decrease the interval.
            return Flux.fromIterable(eventList);
        } else {
        //There are no events happening .So I would like
        //to increase the delay of repeat task by 1 sec

            return Flux.just(buildHeartBeatEvent());
        }
    }).
    onErrorResume(error -> {
        log.error("Error occurred", error);
        return Flux.error(error);
    });```
spring-boot project-reactor reactor
1个回答
0
投票

我用Flux.<Duration>generate()实现了此功能:

        Flux
            .<Duration>generate(sink -> {
                Date date = [NEXT_DATE];
                if (date != null) {
                    long millis = date.getTime() - System.currentTimeMillis();
                    sink.next(Duration.ofMillis(millis));
                }
                else {
                    sink.complete();
                }
            })
            .concatMap(duration ->
                    Mono.delay(duration)
                    ...
            )
            .repeat();

因此,每次我们以generate()返回repeat()时,我们都可以查看某种状态以获得下一个执行Date

© www.soinside.com 2019 - 2024. All rights reserved.