我有一个资源匮乏的进程,我想将最新结果缓存一段时间。
在下面的示例中,我有可观察的
getInterval
。这应该是一个消耗资源的过程。可观察的结果通过 getSharedInterval
共享。最后一次发射会缓存 5 秒。
但我想要实现的是,
getInterval
observable 应在外部 observable unscrubscibes 时立即完成,但最新结果应在接下来的 5 秒内缓存。
这可能吗?
const getInterval = new Observable((source) => {
const interval = setInterval(() => {
source.next('next');
}, 1000);
source.add(() => {
clearInterval(interval);
});
}).pipe(
tap(() => console.log('I am running')),
finalize(() => console.log('completed'))
);
const getSharedInterval = getInterval.pipe(
share({
resetOnRefCountZero: () => timer(5000),
connector: () => new ReplaySubject(1),
resetOnComplete: true,
})
);
getSharedInterval
.pipe(
take(1),
tap(() => console.log('I dont no want anymore'))
)
.subscribe(console.log);
是的,可以通过
shareReplay()
运算符实现。它接受第二个参数,这是一个计时器,用于确定缓存数据的保留时间。
但是,缓存过期后,未来的订阅者只会收到
null
。因此,当缓存过期时,它需要一些额外的代码来“替换”可观察的。
let reusedObservable$;
const replaceInterval = () => {
reusedObservable$ = getInterval.pipe(
take(1),
shareReplay(1, 5000)
);
return reusedObservable$;
}
export function getSharedInterval() {
return replaceInterval().pipe(
first(
null,
defer(() => replaceInterval())
),
mergeMap((d) => (isObservable(d) ? d : of(d)))
);
}
通过此设置,第一个订阅者会触发发出第一个值并缓存 5 秒。未来的订阅者将在 5 秒内获得缓存的值。
订阅者在 5 秒后将导致
null
被发出,这会调用返回的替换可观察值,从而重新启动缓存周期。
归功于这篇文章我在研究这个答案时发现的。