MissingBackpressureException:由于缺少请求而无法发出缓冲区

问题描述 投票:0回答:1

[我已经收到包含RxJava MissingBackpressureException: Could not emit buffer due to lack of requestsFlowable的错误报告,但是我正在努力创建一个简单的测试用例来演示问题(维护Flowable的结构)。

这是我要组合的测试,它在管道中保持相同的阶段:

int inputEvents=10000;

CountDownLatch completed = new CountDownLatch(1);
Flowable<List<String>> flowable = Flowable.<String>create(e -> {

    System.out.println(Thread.currentThread().getName() + ": Will send");
    for (int counter = 0; counter < inputEvents; counter++) {
        e.onNext("" + counter);
        Thread.sleep(5);
    }
    System.out.println(Thread.currentThread().getName() + ": Completed sending");
    e.onComplete();
}, BackpressureStrategy.DROP)
    .onBackpressureDrop(s -> System.out.println("Backpressure, dropping " + Arrays.asList(s)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnNext(strings -> System.out.println("\t" + Thread.currentThread().getName() + ": Buffered: " + strings.size() + " items"))
    .observeOn(Schedulers.io(), false)
    .doOnNext(strings -> {
        System.out.println("\t" + "\t" + Thread.currentThread().getName() + ": Waiting: " + strings.size());
        Thread.sleep(5000);
    });

flowable
    .subscribe(s -> System.out.println("\t" + "\t" + "onNext: " + s.size()),
            error -> {
                throw new RuntimeException(error);
            },
            () -> {
                System.out.println("\t" + "\t" + "Complete");
                completed.countDown();
            });

completed.await();

在生产中,我们获得带有以下堆栈跟踪的MissingBackpressureException: Could not emit buffer due to lack of requests

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
        at io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber.fastPathEmitMax(QueueDrainSubscriber.java:87)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableBufferTimed$BufferExactUnboundedSubscriber.run(FlowableBufferTimed.java:207)
        at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

所以我认为这与缓冲区的下游工作有关。

但是,无论我阻塞doOnNext多长时间,我都无法重现该问题。输出示例:

main: Will send
    RxComputationThreadPool-1: Buffered: 197 items
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 197
        RxCachedThreadScheduler-1: Waiting: 196
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 196
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
...

我原以为,由于Thread.sleep(5000)花费的时间太长,我们将承受背压。

是否有一种方法可以模拟此,理想情况下是使用TestScheduler / TestSubscriber进行测试(以避免Thread.sleep())?

rx-java
1个回答
1
投票
我能够通过增加事件的发出速率,增加事件的最大数量并降低使用者处理事件的速率来重现MissingBackpressureException。

溢出的缓冲区是默认的observeOn(...)运算符的大小为128的缓冲区。由于它每秒接收一次新列表,因此至少要花费几分钟的背压才能溢出。

注意,您可以通过将其作为arg传递给observeOn(...)来覆盖此默认缓冲区的大小。

回到背压处理,我认为管道的主要问题是buffer(1, TimeUnit.SECONDS)运算符。如果您查看javadoc:

背压:此运算符不支持背压,因为它使用了时间。它向上游请求Long.MAX_VALUE,不向下游服从请求。

由于上述原因,您的onBackPressureDrop(...)从未被调用。我认为您可以通过在onBackPressureDrop(...)之后放置buffer(...)来解决此问题。这样做会产生您的Backpressure, dropping...消息。

您应该可以使用以下方法进行单元测试:TestScheduler.advanceTimeBy(long, TimeUnit)。尽管我必须承认,但我还没有尝试过。

© www.soinside.com 2019 - 2024. All rights reserved.