基于属性的窗口可流动

问题描述 投票:0回答:1

我有一个包含时间(Instant)的事件流。我想从给定大小(Duration)的第一次开始创建块。如果在设定时间内没有任何东西进入,我也想发出空块。

一个例子:

输入:[t0, t1, t2, t3, t7, t8, t9, t10]

具有窗口大小2的输出:[[t0, t1], [t2, t3], [], [t7], [t8, t9], [t10]]

问题:

我已经尝试创建一个辅助窗口发布者,该发布者将发出创建新窗口的瞬间。我正在分享我在下面的内容以供参考,但是它不起作用。

代码:

/**
 * Creates a [Flowable] of chunks of time. It may return empty chunks
 * if nothing happened during the set period.
 *
 * ie: [1, 2, 3, 4] -> [[1, 2], [3, 4]]
 */
fun <T : TimeAware> Flowable<T>.timeChunks(size: Duration, startAt: Instant? = null): TimeChunksFlowable<T> {
    return TimeChunksFlowable(this, size, startAt)
}

class TimeChunksFlowable<T : TimeAware> (
    private val source: Flowable<T>,
    val size: Duration,
    private var startAt: Instant? = null
) : Flowable<TimeBracketFlowable<T>>() {

    init {
        require(!size.isNegative && !size.isZero) { "Size must be positive: $size" }
    }

    override fun subscribeActual(s: Subscriber<in TimeBracketFlowable<T>>) {
        source
            .publish { it.window() }
            .subscribe(s)
    }

    private fun Flowable<T>.window(): Flowable<TimeBracketFlowable<T>> {
        val chunks = chunks()
        return window(chunks).brackets(chunks)
    }

    private fun Flowable<Flowable<T>>.brackets(starts: Flowable<Instant>): Flowable<TimeBracketFlowable<T>> {
        val bi = BiFunction { f: Flowable<T>, i: Instant -> f.timeBracket(i, i + size) }
        return zipWith(starts, bi)
    }

    /**
     * Creates a helper [Flowable] which emits an event every time we need to create a new chunk.
     */
    private fun Flowable<T>.chunks(): ChunkStartsFlowable<T> {
        return ChunkStartsFlowable(this, size, startAt)
    }

    private class ChunkStartsFlowable<T : TimeAware> (
        private val source: Flowable<T>,
        private val size: Duration,
        private val startAt: Instant?
    ) : Flowable<Instant>() {

        override fun subscribeActual(s: Subscriber<in Instant>) {

            val subscriber = InternalSubscriber<T>(size, startAt)
            source.subscribe(subscriber)
            subscriber.publisher.subscribe(s)
        }

        private class InternalSubscriber<T : TimeAware> (
            private val size: Duration,
            private var startAt: Instant?
        ) : Subscriber<T> {

            val publisher = UnicastProcessor.create<Instant>()

            override fun onSubscribe(s: Subscription) {
                publisher.onSubscribe(s)
            }

            override fun onNext(t: T) {
                var start = startAt ?: t.time

                // add empty chunks since nothing happened
                while (start + size <= t.time) {
                    publisher.onNext(start)
                    start += size
                }

                startAt = start
            }

            override fun onError(t: Throwable) {
                publisher.onError(t)
            }

            override fun onComplete() {
                startAt?.apply(publisher::onNext)
                publisher.onComplete()
            }

        }

    }
}
java kotlin rx-java reactive-programming
1个回答
0
投票

UPDATE:对这个问题进行了更多思考之后,我发现缓冲区应该足够了:

upstream.buffer(
    windowSize.toMillis(),
    windowSize.toMillis(),
    TimeUnit.MILLISECONDS);
© www.soinside.com 2019 - 2024. All rights reserved.