如何将mergeMap内部订阅限制为最后N个

问题描述 投票:1回答:1

我有一个源流从两个流合并而成。当源流发出事件时,我想调用订阅函数Meteor.subscribe并将其保持打开状态,因此我使用mergeMap。准备好订阅后,我将通过管道传输到另一个mergeMap来填充数据。在我单击100次并且内存消耗激增之前,它一直运行良好。问题是,怎么可能将mergeMap限制为concurrent: Number的前N个订阅,而不是最近的N个,例如滑动窗口?

function paginationCache$(): Observable<any> {

    return merge(this.pageParamsChanged$, this.routerParamsChanged$)
        .pipe(
            mergeMap((newParams) => {
                // First merge map subscribes to data and un subscribes when second merge map unsubscribes
                return Observable.create((observer: Subscriber<any>) => {

                    let subscription = Meteor.subscribe('my/best/data/sub', newParams,
                        () => {
                            observer.next(subscription);
                            observer.complete();
                        });
                });
            }),
            mergeMap((subscription: any) => {
                // second subscription is just populating the data

                return Observable.create((observer: Subscriber<Meteor.Error | any>) => {

                    const collection = new Mongo.Collection(subscription.collectionName);

                    const { selector, options } = this.mongoParams();

                    collection.find(selector, options).dataChanges((data) => {
                        observer.next({ data });
                    });

                    return () => {
                        subscription.stop();
                    };
                });

            })
        );
}

meteor rxjs mergemap
1个回答
0
投票

不是并发的前N个订阅:编号,而是最近的N个订阅,例如滑动窗口

如果我理解正确,您会想要这样的东西(假设N = 3:]

N = 3 Crt | 1 | 2 | 3 | Subscriptions | S1 | S2 | S3 | When Crt = 4 Crt | 2 | 3 | 4 | Subscriptions | S2 | S3 | S4 |

如果是这种情况,这就是我的解决方法:

const subscriptionsSubject = new Subject();

src$.pipe(
  mergeMap(
    data => (new Observable(s => {/* ... */ subscriptionsSubject.next(null) /* Notify about a new subscription when it's the case */ }))
      .pipe(
        takeUntil(
          subscriptionsSubject.pipe(
            take(N), // After `N` subscriptions, it will complete
            filter((_, idx) => idx === N - 1) // Do not want to complete immediately, but only when exactly `N` subscriptions have been reached
          )
        )
      )
  )
)
© www.soinside.com 2019 - 2024. All rights reserved.