我有一个包含时间(Instant
)的事件流。我想从给定大小(Duration
)的第一次开始创建块。如果在设定时间内没有任何东西进入,我也想发出空块。
一个例子:
输入:[t0, t1, t2, t3, t7, t8, t9, t10]
具有窗口大小2的输出:[[t0, t1], [t2, t3], [], [t7], [t8, t9], [t10]]
问题:
我已经尝试创建一个辅助窗口发布者,该发布者将发出创建新窗口的瞬间。我正在分享我在下面的内容以供参考,但是它不起作用。
代码:
/**
* Creates a [Flowable] of chunks of time. It may return empty chunks
* if nothing happened during the set period.
*
* ie: [1, 2, 3, 4] -> [[1, 2], [3, 4]]
*/
fun <T : TimeAware> Flowable<T>.timeChunks(size: Duration, startAt: Instant? = null): TimeChunksFlowable<T> {
return TimeChunksFlowable(this, size, startAt)
}
class TimeChunksFlowable<T : TimeAware> (
private val source: Flowable<T>,
val size: Duration,
private var startAt: Instant? = null
) : Flowable<TimeBracketFlowable<T>>() {
init {
require(!size.isNegative && !size.isZero) { "Size must be positive: $size" }
}
override fun subscribeActual(s: Subscriber<in TimeBracketFlowable<T>>) {
source
.publish { it.window() }
.subscribe(s)
}
private fun Flowable<T>.window(): Flowable<TimeBracketFlowable<T>> {
val chunks = chunks()
return window(chunks).brackets(chunks)
}
private fun Flowable<Flowable<T>>.brackets(starts: Flowable<Instant>): Flowable<TimeBracketFlowable<T>> {
val bi = BiFunction { f: Flowable<T>, i: Instant -> f.timeBracket(i, i + size) }
return zipWith(starts, bi)
}
/**
* Creates a helper [Flowable] which emits an event every time we need to create a new chunk.
*/
private fun Flowable<T>.chunks(): ChunkStartsFlowable<T> {
return ChunkStartsFlowable(this, size, startAt)
}
private class ChunkStartsFlowable<T : TimeAware> (
private val source: Flowable<T>,
private val size: Duration,
private val startAt: Instant?
) : Flowable<Instant>() {
override fun subscribeActual(s: Subscriber<in Instant>) {
val subscriber = InternalSubscriber<T>(size, startAt)
source.subscribe(subscriber)
subscriber.publisher.subscribe(s)
}
private class InternalSubscriber<T : TimeAware> (
private val size: Duration,
private var startAt: Instant?
) : Subscriber<T> {
val publisher = UnicastProcessor.create<Instant>()
override fun onSubscribe(s: Subscription) {
publisher.onSubscribe(s)
}
override fun onNext(t: T) {
var start = startAt ?: t.time
// add empty chunks since nothing happened
while (start + size <= t.time) {
publisher.onNext(start)
start += size
}
startAt = start
}
override fun onError(t: Throwable) {
publisher.onError(t)
}
override fun onComplete() {
startAt?.apply(publisher::onNext)
publisher.onComplete()
}
}
}
}
UPDATE:对这个问题进行了更多思考之后,我发现缓冲区应该足够了:
upstream.buffer(
windowSize.toMillis(),
windowSize.toMillis(),
TimeUnit.MILLISECONDS);