对抛出错误的自定义观察者应用 retryWhen,不起作用

问题描述 投票:0回答:1

我正在使用这个自定义

class
以反应式方式连接到MongoDB:

function RxMongodb(driver){
    driver == null ? this.driver = require('mongodb').MongoClient : this.driver = driver.MongoClient;
}

RxMongodb.prototype.connect = function(uri){
    return Rx.Observable.create((observer)=>{
        if (dbInstance != null){
            console.log('already connected, repassing db instance');
            observer.next(dbInstance);
        } else {
            console.log('connecting for the first time');
            this.driver.connect(uri, (err, db)=>{
                if (err) {
                    console.log('rx mongo error! ' + err);
                    observer.error(err);
                } else {
                    dbInstance = db;
                    observer.next(dbInstance);
                    observer.complete();
                }
            });
        }
    });
}

有时我的mongodb加载需要时间(和这段代码一起调用),所以我想等一下再试。我正在使用

retryWhen
来做到这一点:

Rx.Observable.fromEvent(emitter, 'go').
flatMap(()=>rx_mongo.connect(mongo_parameters.url)).                   
retryWhen(errors=>errors.delay(2000)).take(10).
... 

这里

rx_mongo
RxMongbdb
的一个实例。我可以在控制台中看到发生了错误,所以它调用了
observer.error(err)
,但它永远不会重试任何东西。

另外,我看到很多文档使用 onNext、onError、onCompleted 而不是 next、error、completed。哪个是现代的?是我的吧?

更新

我认为这可能与 error() 有关。调用 error() 和抛出错误有什么区别?

node.js mongodb rxjs reactivex
1个回答
2
投票

您将 retryWhen 设置在错误的位置......您必须将其放在 mongo connect observable 上。虽然,mongo connect Observable 必须是热的,并且不会在每次事件“go”发出时都被触发。只有当 retryWhen + 10 个意图 finished 时才需要再次触发。你也没有设置好 take(10),你不需要重试 9 次来执行 10 次连接意图。

我编辑了我的帖子并添加了一个例子。 mousedown 事件模拟您的 go 事件。查看 share() 运算符,重要的是,对于我执行的更多点击,源 observable 被共享而不是再次触发连接。

//emit value every 1s
const source = Rx.Observable.fromEvent(document, 'mousedown');
const hotMongoConnect$ = Rx.Observable.interval(1000) 
                                 .map(val => {throw val;}) // mocks rxMongo.connect
                                 .retryWhen((errors)=> {
                                     return errors
                                      .scan(function(errorCount, err) {
                                        if(errorCount >= 9) {
                                          throw errorCount + 1;
                                        }
                                        return errorCount + 1;
                                       }, 0)
                                       .do(errorCount => console.log('connect failed. retry nº: ', errorCount))
                                       .delayWhen(val => Rx.Observable.timer(2000));
                                    })
                                    .share();

const example = source.flatMap(() => hotMongoConnect$)
const subscribe = example.subscribe(val => console.log(val), error => console.error('Error connectiong to server, nº of intents:', error));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>

希望这有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.