以无响应方式从mongodb集合中查询所有文档的最佳方法,没有溢出RAM

问题描述 投票:3回答:2

我想以被动方式查询集合中的所有文档。 mongodb nodejs驱动程序的collection.find()方法返回一个游标,该游标触发集合中找到的每个文档的事件。所以我做了这个:

function giant_query = (db) => {
    var req = db.collection('mycollection').find({});   
    return Rx.Observable.merge(Rx.Observable.fromEvent(req, 'data'),
                               Rx.Observable.fromEvent(req, 'end'),
                               Rx.Observable.fromEvent(req, 'close'),
                               Rx.Observable.fromEvent(req, 'readable'));
}

它将完成我想要的:为每个文档触发,这样我就可以以反应方式进行处理,如下所示:

Rx.Observable.of('').flatMap(giant_query).do(some_function).subscribe()

我可以以几十个数据包的形式查询文档,但是每次可观察流被触发时,我都必须跟踪索引号,而且我必须进行一个可观察的循环,我不知道是否是可行的还是正确的方法。

此游标的问题是我不认为它可以处理数据包。它可能会在短时间内触发所有事件,因此会淹没我的RAM。即使我使用Observable的缓冲区将某些事件缓冲在数据包中,事件和事件数据(文档)也将在RAM上等待操作。

以被动方式处理它的最佳方法是什么?

node.js mongodb rxjs reactivex
2个回答
5
投票
我省去了数据以外的事件,因为节制似乎是最主要的问题。

var cursor = db.collection('mycollection').find({}); const cursorNext = new Rx.BehaviourSubject('next'); // signal first batch then wait const nextBatch = () => { if(cursor.hasNext()) { cursorNext.next('next'); } }); cursorNext .switchMap(() => // wait for cursorNext to signal Rx.Observable.fromPromise(cursor.next()) // get a single doc .repeat() // get another .takeWhile(() => cursor.hasNext() ) // stop taking if out of data .take(batchSize) // until full batch .toArray() // combine into a single emit ) .map(docsBatch => { // do something with the batch // return docsBatch or modified doscBatch }) ... // other operators? .subscribe(x => { ... nextBatch(); });

[我正在尝试在没有mongodb的情况下对此Rx流进行测试,与此同时,这可能会给您一些想法。


0
投票
© www.soinside.com 2019 - 2024. All rights reserved.