RxJava:将Observable与Completable合并不起作用

问题描述 投票:1回答:1

我有一个Observable,在某个时候必须将内容写入缓存-我们希望等到写入完成后再对Observable进行整个操作(出于报告目的。)>

出于测试目的,缓存写Completable看起来像这样:

   Completable.create(
                  emitter ->
                      new Thread(
                              () -> {
                                try {
                                  Thread.sleep(2000);
                                  doSomething();
                                  emitter.onComplete();
                                } catch (InterruptedException e) {
                                  e.printStackTrace();
                                }
                              })
                          .start());

由于我有几次缓存写操作,所以我尝试将它们合并到一个容器类中:

public class CacheInsertionResultsTracker {

  private Completable cacheInsertResultsCompletable;

  public CacheInsertionResultsTracker() {
    this.cacheInsertResultsCompletable = Completable.complete();
  }

  public synchronized void add(Completable cacheInsertResult) {
    this.cacheInsertResultsCompletable = this.cacheInsertResultsCompletable.mergeWith(cacheInsertResult);
  }

  public Completable getCompletable() {
    return this.cacheInsertResultsCompletable;
  }
}

而且我尝试通过以下方式将其与Observable合并:

CacheInsertionResultsTracker tracker = new ...;
    observable
        .doOnNext(next->tracker.add(next.writeToCache(...)))
        .mergeWith(Completable.defer(()->tracker.getCompletable()))
        .subscribe(
            // on next
            this::logNextElement
            // on error
            this::finishWithError
            // on complete
            this::finishWithSuccess
            );

我如何确定在finishWithSuccess之前,doSomething已完成?问题是,每次添加新引用时,Completable引用都会更新,并且它会在mergeWith运行之后发生...

我有一个Observable,在某个时候必须将内容写入缓存-我们希望等到写入完成后再在Observable上完成整个操作(出于报告目的……)>

java concurrency reactive-programming rx-java2
1个回答
0
投票

基于注释,您可以用ReplaySubject<Completable>替换可填充的缓存,进行一些超时以检测不活动并观察到序列结束。

 ReplaySubject<Completable> cache = ReplaySubject.create();

 cache.onNext(completable);

 observable.mergeWith(
     cache.flatMapCompletable(v -> v)
          .timeout(10, TimeUnit.MILLISECONDS, Completable.complete())
 )
© www.soinside.com 2019 - 2024. All rights reserved.