我有一个可观察的序列。当插入第一个元素时,我想启动一个计时器,并在计时器的时间范围内对随后插入的元素进行批处理。然后,在序列中插入另一个元素之前,计时器不会再次启动。
--------|=====timespan====|---------------|=====timespan====|-------------->
1 2 3 4 5 6
将产生:
[1,2,3,4,5], [6]
[我尝试过使用Observable.buffer()
和timespan
,但是从我的实验中,我可以看到,只要我们订阅了可观察的序列,计时器就会启动,而上一个计时器完成后,计时器就会重新启动。
因此具有与上一个示例相同的顺序,并且将buffer()
与timespan
一起使用,我将得到这样的内容:
|=====timespan====|=====timespan====|=====timespan====|=====timespan====|-->
1 2 3 4 5 6
将产生此结果:
[1,2,3,4], [], [5,6], []
这与29858974本质上是相同的问题,但仅适用于java。
所以问题是,由于我不想过多地延迟流,所以我希望有一个非常短的计时器,而且该计时器会非常耗费时间。我可以只过滤空列表,但是我认为这对CPU的影响太大。
window
运算符将充当buffer
,您不能直接使用它。
想法是通过第一个可观测值(我称为timer
)的发射来控制insertions
。为此,您必须包含第三个参数以链接两个可观察对象(解决方案下面的stopWatch
主题)。
@Test
public void stop_watch_observable() {
Subject<Long> stopWatch = PublishSubject.create();
Observable<Long> insertions = insertions();
//share to use it as a timer (looking for the first emission)
//and to recieve the items
Observable<Long> shared = insertions.share();
//for each emission of insertions we start a new timer
//but only the first one is emitted
//the others are stopped by the takeUntil(stopWatch)
Observable<Long> window = shared
.flatMap(e -> Observable.timer(3, TimeUnit.SECONDS).takeUntil(stopWatch));
shared.buffer(window)
//each time a window is generated we kill all the current timers
.doOnNext(e -> stopWatch.onNext(0L))
.blockingSubscribe(System.out::println);
}
// insertions generator which is comming randomly
private Observable<Long> insertions() {
AtomicLong al = new AtomicLong(0);
return Observable.generate((Emitter<Long> emitter) -> {
if (al.getAndIncrement() % 4 == 0) {
Long timeToWait = Long.parseLong(RandomStringUtils.randomNumeric(1));
System.out.println("sleeping for: " + timeToWait);
sleep(timeToWait * 1000);
} else {
sleep(500);
}
emitter.onNext(al.get());
}).subscribeOn(Schedulers.newThread());
}
第一个解决方案的缺点是每次插入插入都会启动timer
(这可能会占用大量CPU资源)。这里的另一个解决方案是一次只启动一个计时器(我认为这样更有效:
@Test
public void stop_watch_observable() {
Observable<Long> insertions = insertions();
Observable<Long> shared = insertions.share();
AtomicBoolean timerOn = new AtomicBoolean(false);
Observable<Long> window = shared
.flatMap(e -> timerOn.get() ? Observable.empty() : Observable.timer(3, TimeUnit.SECONDS)
.doOnSubscribe(x -> timerOn.set(true))
);
shared.buffer(window)
//each time a window is generated we kill all the current timers
.doOnNext(e -> timerOn.set(false))
.blockingSubscribe(System.out::println);
}