OnBackpreasure如何与java reactor一起工作

问题描述 投票:0回答:0

我从 Java 反应堆开始,试图了解背压是如何工作的。 我写了下面的示例代码:

 Flux<Long> publisher=Flux.interval(Duration.ofMillis(1));
      publisher
              .subscribeOn(Schedulers.newSingle("publisher-x"))
              .map(x->{
                  System.out.println("mapper on Thread "+Thread.currentThread().getName()+" for item "+x);
                  return x;
              })
              .publishOn(Schedulers.newSingle("subscriber-x"))
              .subscribe(new Subscriber<Long>() {
          private Subscription subscription;
          private int count = 0;

          @Override
          public void onSubscribe(Subscription subscription) {
              this.subscription = subscription;
              subscription.request(Long.MAX_VALUE);
          }

          @Override
          public void onNext(Long item) {
              try {
                  TimeUnit.MILLISECONDS.sleep(5000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              System.out.println("subscriber on Thread "+Thread.currentThread().getName()+" for item "+item);
              count++;
              /*if (count >= 5) {
                  subscription.cancel();
              } else {s
                  subscription.request(1);
              }*/
          }

          @Override
          public void onError(Throwable throwable) {
              throwable.printStackTrace();
          }

          @Override
          public void onComplete() {
              System.out.println("Done in thread "+Thread.currentThread().getName());
          }
      });
  }

我原以为我的发布者会继续生成项目,直到出现反压异常,但我对我得到的结果感到惊讶:

mapper on Thread parallel-1 for item 0
mapper on Thread parallel-1 for item 1
...
mapper on Thread parallel-1 for item 255
subscriber on Thread subscriber-x-1 for item 0
subscriber on Thread subscriber-x-1 for item 1
...
subscriber on Thread subscriber-x-1 for item 255
reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
    at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:132)
    at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我不明白为什么我的发布者在第 255 次迭代时停止,为什么我只在我的订阅者完成处理所有项目后才得到异常(当环形缓冲区为空时抛出异常!!) 谁能帮助理解这一点?

java project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.