是否可以在运行时更改Flowable.interval周期?
LOGGER.info("Start generating bullshit for 7 seconds:");
Flowable.interval(3, TimeUnit.SECONDS)
.map(tick -> random.nextInt(100))
.subscribe(tick -> LOGGER.info("tick = " + tick));
TimeUnit.SECONDS.sleep(7);
LOGGER.info("Change interval to 2 seconds:");
我有一个解决方法,但是最好的方法是创建一个新的运算符。
您有一个触发源,它将在开始新的时间间隔时提供值。源以一定间隔作为内部流被switchMapped。内部流采用上游源的输入值来设置新的间隔时间。
当源发出时间(长)时,将调用switchMap lambda,并且将立即订阅返回的Flowable。当新值到达switchMap时,将取消订阅内部订阅的Flowable间隔,并且将再次调用lambda。返回的Inverval-Flowable将被重新订阅。
这意味着,每次从源发出时,都会创建一个新的Inveral。
当内订订阅并且将要发出新值并且从源发出新值时,将取消订阅内部流(inverval)。因此,不再发出该值。新的Interval-Flowable已订阅,并将为其配置发出值。
lateinit var scheduler: TestScheduler
@Before
fun init() {
scheduler = TestScheduler()
}
@Test
fun `62232235`() {
val trigger = PublishSubject.create<Long>()
val switchMap = trigger.toFlowable(BackpressureStrategy.LATEST)
// make sure, that a value is emitted from upstream, in order to make sure, that at least one interval emits values, when the upstream-sources does not provide a seed value.
.startWith(3)
.switchMap {
Flowable.interval(it, TimeUnit.SECONDS, scheduler)
.map { tick: Long? ->
tick
}
}
val test = switchMap.test()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
test.assertValues(0, 1, 2)
// send new onNext value at absolute time 10
trigger.onNext(10)
// the inner stream is unsubscribed and a new stream with inverval(10) is subscribed to. Therefore the first vale will be emitted at 20 (current: 10 + 10 configured)
scheduler.advanceTimeTo(21, TimeUnit.SECONDS)
// if the switch did not happen, there would be 7 values
test.assertValues(0, 1, 2, 0)
}