我有一个Observable,在某个时候必须将内容写入缓存-我们希望等到写入完成后再对Observable进行整个操作(出于报告目的。)>
出于测试目的,缓存写Completable看起来像这样:
Completable.create( emitter -> new Thread( () -> { try { Thread.sleep(2000); doSomething(); emitter.onComplete(); } catch (InterruptedException e) { e.printStackTrace(); } }) .start());
由于我有几次缓存写操作,所以我尝试将它们合并到一个容器类中:
public class CacheInsertionResultsTracker { private Completable cacheInsertResultsCompletable; public CacheInsertionResultsTracker() { this.cacheInsertResultsCompletable = Completable.complete(); } public synchronized void add(Completable cacheInsertResult) { this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult); } public Completable getCompletable() { return this.cacheInsertResultsCompletable; } }
而且我尝试通过以下方式将其与Observable合并:
CacheInsertionResultsTracker tracker = new ...; observable .doOnNext(next->tracker.add(next.writeToCache(...))) .mergeWith(Completable.defer(()->tracker.getCompletable())) .subscribe( // on next this::logNextElement // on error this::finishWithError // on complete this::finishWithSuccess );
我如何确定在finishWithSuccess之前,doSomething已完成?问题是,每次添加新引用时,Completable引用都会更新,并且它会在mergeWith运行之后发生...
我有一个Observable,在某个时候必须将内容写入缓存-我们希望等到写入完成后再在Observable上完成整个操作(出于报告目的……)>
基于注释,您可以用ReplaySubject<Completable>
替换可填充的缓存,进行一些超时以检测不活动并观察到序列结束。
ReplaySubject<Completable> cache = ReplaySubject.create();
cache.onNext(completable);
observable.mergeWith(
cache.flatMapCompletable(v -> v)
.timeout(10, TimeUnit.MILLISECONDS, Completable.complete())
)