了解RxJava onBackpressureBuffer中的容量参数

问题描述 投票:0回答:1

这是我编写的一个小示例应用程序:

package ru.maksim.sample.app

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit


class MainActivity : AppCompatActivity() {
    private val subject = PublishSubject.create<Int>()
    private lateinit var disposable: Disposable
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        disposable = observeInts()
            .subscribe(
                {
                    Log.d("SampleApp", "next=$it")
                },
                {
                    Log.e("SampleApp", "error", it)
                },
                {
                    Log.d("SampleApp", "complete")
                }
            )
        start.setOnClickListener {
            subject.onNext(1)
        }
    }

    override fun onDestroy() {
        disposable.dispose()
        super.onDestroy()
    }

    private fun observeInts() = subject
        .toFlowable(BackpressureStrategy.BUFFER)
        .onBackpressureBuffer(4, {
            Log.d("SampleApp", "Overflow")
        }, BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.computation())
        .flatMap {
            Log.d("SampleApp", "onNext BEFORE delay: $it")
            Flowable.just(it)
        }
        .delay(10L, TimeUnit.SECONDS)
        .flatMap {
            Log.d("SampleApp", "onNext AFTER delay: $it")
            Flowable.just(it)
        }
}

start只是一个按钮。按下按钮超过4次(您在onBackpressureBuffer中看到的是4的缓冲区容量)次之后,我希望看到Overflow是日志,但是没有发生。我不明白为什么。

android rx-java backpressure
1个回答
0
投票

我认为您没有看到警告,因为事件太少了。您可以尝试在setOnClickListener上替换您的回调,然后再次检查它:

start.setOnClickListener {
    for (i in 0..1000) {
        subject.onNext(i)
    }   
}
© www.soinside.com 2019 - 2024. All rights reserved.