我有一个源流从两个流合并而成。当源流发出事件时,我想调用订阅函数Meteor.subscribe
并将其保持打开状态,因此我使用mergeMap
。准备好订阅后,我将通过管道传输到另一个mergeMap
来填充数据。在我单击100次并且内存消耗激增之前,它一直运行良好。问题是,怎么可能将mergeMap限制为concurrent: Number
的前N个订阅,而不是最近的N个,例如滑动窗口?
function paginationCache$(): Observable<any> {
return merge(this.pageParamsChanged$, this.routerParamsChanged$)
.pipe(
mergeMap((newParams) => {
// First merge map subscribes to data and un subscribes when second merge map unsubscribes
return Observable.create((observer: Subscriber<any>) => {
let subscription = Meteor.subscribe('my/best/data/sub', newParams,
() => {
observer.next(subscription);
observer.complete();
});
});
}),
mergeMap((subscription: any) => {
// second subscription is just populating the data
return Observable.create((observer: Subscriber<Meteor.Error | any>) => {
const collection = new Mongo.Collection(subscription.collectionName);
const { selector, options } = this.mongoParams();
collection.find(selector, options).dataChanges((data) => {
observer.next({ data });
});
return () => {
subscription.stop();
};
});
})
);
}
不是并发的前N个订阅:编号,而是最近的N个订阅,例如滑动窗口
如果我理解正确,您会想要这样的东西(假设N = 3
:]
N = 3 Crt | 1 | 2 | 3 | Subscriptions | S1 | S2 | S3 | When Crt = 4 Crt | 2 | 3 | 4 | Subscriptions | S2 | S3 | S4 |
如果是这种情况,这就是我的解决方法:
const subscriptionsSubject = new Subject(); src$.pipe( mergeMap( data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ })) .pipe( takeUntil( subscriptionsSubject.pipe( take(N), // After `N` subscriptions, it will complete filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached ) ) ) ) )