RxJava onBackPressureBuffer和onBackPressureDrop的困惑

问题描述 投票:0回答:1

我希望我的代码像这张图一样工作,但是它不起作用...enter image description here

我的代码:

    private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
        return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
            /* filter target event. */
            EventPredictable(cls)
        ).cast(
            /* cast to target event */
            cls
        ).onBackpressureDrop {
            Log.i(TAG, "drop event: $it")
        }.concatMap { data ->
            /* start interval task blocking */
            val period = 1L
            val unit = TimeUnit.SECONDS
            MLog.d(TAG, "startInterval: data = $data")
            Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
                getStopFlowable()
            ).map {
                Pair(data, it)
            }
        }
    }

    private fun getStopFlowable(): Flowable<StopIntervalEvent> {
        return RxBus.getDefault().register(StopIntervalEvent::class.java)
                    .toFlowable(BackpressureStrategy.LATEST)
    }

当我在10毫秒内发送140事件时,我的代码丢失12事件,而不是我期望的140-4 = 136事件。为什么我的代码不能像上图那样工作?感谢您的收看和回答!

android rx-java backpressure
1个回答
0
投票

onBackpressurDrop随时准备接收项目,因此onBackpressureBuffer在您的设置中没有实际作用。 onBackpressurBuffer(int)将在溢出时失败,因此您永远不会使用它来预期的行为。另外,concatMap默认情况下会预先提取2个项目,因此它将获取源项目1和2。

© www.soinside.com 2019 - 2024. All rights reserved.