我如何更改Flowable.interval的期限

问题描述 投票:1回答:1

是否可以在运行时更改Flowable.interval周期?

    LOGGER.info("Start generating bullshit for 7 seconds:");

    Flowable.interval(3, TimeUnit.SECONDS)
        .map(tick -> random.nextInt(100))
        .subscribe(tick -> LOGGER.info("tick = " + tick));
    TimeUnit.SECONDS.sleep(7);

    LOGGER.info("Change interval to 2 seconds:");
rx-java2
1个回答
0
投票

我有一个解决方法,但是最好的方法是创建一个新的运算符。

此解决方案如何工作?

您有一个触发源,它将在开始新的时间间隔时提供值。源以一定间隔作为内部流被switchMapped。内部流采用上游源的输入值来设置新的间隔时间。

switchMap

当源发出时间(长)时,将调用switchMap lambda,并且将立即订阅返回的Flowable。当新值到达switchMap时,将取消订阅内部订阅的Flowable间隔,并且将再次调用lambda。返回的Inverval-Flowable将被重新订阅。

这意味着,每次从源发出时,都会创建一个新的Inveral。

它的行为如何?

当内订订阅并且将要发出新值并且从源发出新值时,将取消订阅内部流(inverval)。因此,不再发出该值。新的Interval-Flowable已订阅,并将为其配置发出值。

解决方案

lateinit var scheduler: TestScheduler

@Before
fun init() {
    scheduler = TestScheduler()
}

@Test
fun `62232235`() {
    val trigger = PublishSubject.create<Long>()

    val switchMap = trigger.toFlowable(BackpressureStrategy.LATEST)
        // make sure, that a value is emitted from upstream, in order to make sure, that at least one interval emits values, when the upstream-sources does not provide a seed value.
        .startWith(3)
        .switchMap {
            Flowable.interval(it, TimeUnit.SECONDS, scheduler)
                .map { tick: Long? ->
                    tick
                }
        }

    val test = switchMap.test()

    scheduler.advanceTimeBy(10, TimeUnit.SECONDS)

    test.assertValues(0, 1, 2)

    // send new onNext value at absolute time 10
    trigger.onNext(10)

    // the inner stream is unsubscribed and a new stream with inverval(10) is subscribed to. Therefore the first vale will be emitted at 20 (current: 10 + 10 configured)
    scheduler.advanceTimeTo(21, TimeUnit.SECONDS)

    // if the switch did not happen, there would be 7 values
    test.assertValues(0, 1, 2, 0)
}
© www.soinside.com 2019 - 2024. All rights reserved.