这是我编写的一个小示例应用程序:
package ru.maksim.sample.app
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit
class MainActivity : AppCompatActivity() {
private val subject = PublishSubject.create<Int>()
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposable = observeInts()
.subscribe(
{
Log.d("SampleApp", "next=$it")
},
{
Log.e("SampleApp", "error", it)
},
{
Log.d("SampleApp", "complete")
}
)
start.setOnClickListener {
subject.onNext(1)
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
private fun observeInts() = subject
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(4, {
Log.d("SampleApp", "Overflow")
}, BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.computation())
.flatMap {
Log.d("SampleApp", "onNext BEFORE delay: $it")
Flowable.just(it)
}
.delay(10L, TimeUnit.SECONDS)
.flatMap {
Log.d("SampleApp", "onNext AFTER delay: $it")
Flowable.just(it)
}
}
start
只是一个按钮。按下按钮超过4次(您在onBackpressureBuffer
中看到的是4的缓冲区容量)次之后,我希望看到Overflow
是日志,但是没有发生。我不明白为什么。
我认为您没有看到警告,因为事件太少了。您可以尝试在setOnClickListener上替换您的回调,然后再次检查它:
start.setOnClickListener {
for (i in 0..1000) {
subject.onNext(i)
}
}