如何使用 RxJs 缓存资源匮乏的进程?

问题描述 投票:0回答:1

我有一个资源匮乏的进程,我想将最新结果缓存一段时间。

在下面的示例中,我有可观察的

getInterval
。这应该是一个消耗资源的过程。可观察的结果通过
getSharedInterval
共享。最后一次发射会缓存 5 秒。

但我想要实现的是,

getInterval
observable 应在外部 observable unscrubscibes 时立即完成,但最新结果应在接下来的 5 秒内缓存。

这可能吗?

const getInterval = new Observable((source) => {
  const interval = setInterval(() => {
    source.next('next');
  }, 1000);

  source.add(() => {
    clearInterval(interval);
  });
}).pipe(
  tap(() => console.log('I am running')),
  finalize(() => console.log('completed'))
);

const getSharedInterval = getInterval.pipe(
  share({
    resetOnRefCountZero: () => timer(5000),
    connector: () => new ReplaySubject(1),
    resetOnComplete: true,
  })
);

getSharedInterval
  .pipe(
    take(1),
    tap(() => console.log('I dont no want anymore'))
  )
  .subscribe(console.log);
caching rxjs
1个回答
0
投票

是的,可以通过

shareReplay()
运算符实现。它接受第二个参数,这是一个计时器,用于确定缓存数据的保留时间。

但是,缓存过期后,未来的订阅者只会收到

null
。因此,当缓存过期时,它需要一些额外的代码来“替换”可观察的。

let reusedObservable$;
const replaceInterval = () => {
  reusedObservable$ = getInterval.pipe(
    take(1),
    shareReplay(1, 5000)
  );
  return reusedObservable$;
}

export function getSharedInterval() {
  return replaceInterval().pipe(
    first(
      null,
      defer(() => replaceInterval())
    ),
    mergeMap((d) => (isObservable(d) ? d : of(d)))
  );
}

通过此设置,第一个订阅者会触发发出第一个值并缓存 5 秒。未来的订阅者将在 5 秒内获得缓存的值。

订阅者在 5 秒后将导致

null
被发出,这会调用返回的替换可观察值,从而重新启动缓存周期。

归功于这篇文章我在研究这个答案时发现的。

© www.soinside.com 2019 - 2024. All rights reserved.