警告:RxJS newb在这里。
这是我的挑战:
onUnlink$
观察到时......onAdd$
观察值捕获值,最多1秒(我将这个分区称为onAddBuffer$
)。doc$
observable)以获取我们将用于匹配其中一个onAdd$
值的模型onAddBuffer$
observable中的一个值与doc$
值匹配,则不发出onAddBuffer$
observable中没有值与doc$
值匹配,或者onAddBuffer$
observable从未发出,则发出doc$
值这是我最好的猜测:
// for starters, concatMap doesn't seem right -- I want a whole new stream
const docsToRemove$ = onUnlink$.concatMap( unlinkValue => {
const doc$ = Rx.Observable.fromPromise( db.File.findOne({ unlinkValue }) )
const onAddBuffer$ = onAdd$
.buffer( doc$ ) // capture events while fetching from db -- not sure about this
.takeUntil( Rx.Observable.timer(1000) );
// if there is a match, emit nothing. otherwise wait 1 second and emit doc
return doc$.switchMap( doc =>
Rx.Observable.race(
onAddBuffer$.single( added => doc.attr === added.attr ).mapTo( Rx.Observable.empty() ),
Rx.Observable.timer( 1000 ).mapTo( doc )
)
);
});
docsToRemove$.subscribe( doc => {
// should only ever be invoked (with doc -- the doc$ value) 1 second
// after `onUnlink$` emits, when there are no matching `onAdd$`
// values within that 1 second window.
})
这总是发出EmptyObservable
。也许是因为当没有比赛时single
似乎会发出undefined
,而且我认为在没有比赛时它根本不会发出?同样的事情发生在find
。
如果我将single
改为filter
,那么什么都不会发出。
仅供参考:这是一个带文件系统事件的重命名方案 - 如果add
事件在unlink
事件的1秒内跟随并且发出的文件哈希匹配,则不执行任何操作,因为它是rename
。否则它是一个真正的unlink
,它应该发出要删除的数据库文档。
这是我猜你怎么做到这一点:
onUnlink$.concatMap(unlinkValue => {
const doc$ = Rx.Observable.fromPromise(db.File.findOne({ unlinkValue })).share();
const bufferDuration$ = Rx.Observable.race(Rx.Observable.timer(1000), doc$);
const onAddBuffer$ = onAdd$.buffer(bufferDuration$);
return Observable.forkJoin(onAddBuffer$, doc$)
.map(([buffer, docResponse]) => { /* whatever logic you need here */ });
});
single()
运算符有点棘手,因为它只在源Observable完成后才发出匹配谓词函数的项(或者在有两个项或没有匹配项时发出错误)。
race()
也很棘手。如果其中一个源Observable完成并且没有发出任何值,则race()
将完成并且不会发出任何内容。我前段时间报道了这一点,这是正确的行为,请参阅https://github.com/ReactiveX/rxjs/issues/2641。
我猜你的代码出了问题。
另请注意,.mapTo(Rx.Observable.empty())
会将每个值映射到Observable的实例中。如果要忽略所有值,可以使用filter(() => false)
或ignoreElements()
运算符。