我的代码:
private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
/* filter target event. */
EventPredictable(cls)
).cast(
/* cast to target event */
cls
).onBackpressureDrop {
Log.i(TAG, "drop event: $it")
}.concatMap { data ->
/* start interval task blocking */
val period = 1L
val unit = TimeUnit.SECONDS
MLog.d(TAG, "startInterval: data = $data")
Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
getStopFlowable()
).map {
Pair(data, it)
}
}
}
private fun getStopFlowable(): Flowable<StopIntervalEvent> {
return RxBus.getDefault().register(StopIntervalEvent::class.java)
.toFlowable(BackpressureStrategy.LATEST)
}
当我在10毫秒内发送140事件时,我的代码丢失12事件,而不是我期望的140-4 = 136事件。为什么我的代码不能像上图那样工作?感谢您的收看和回答!
onBackpressurDrop
随时准备接收项目,因此onBackpressureBuffer
在您的设置中没有实际作用。 onBackpressurBuffer(int)
将在溢出时失败,因此您永远不会使用它来预期的行为。另外,concatMap
默认情况下会预先提取2个项目,因此它将获取源项目1和2。