如何从数据和盖茨外出时可观察到的?
对于我个人理解,你需要data$
时gates$
发出真实的,data$
的缓冲,否则,当gates$
再次发出真正的结束,所以某事像:
out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))
假设:data$
,gates$
是热流(参见这意味着什么这里Hot and Cold observables : are there 'hot' and 'cold' operators?)。
这不是测试,但尝试一下,让我们知道,如果它确实工作(或代码证明它就像你说:-)。逻辑看起来不错,我只是不确定再入gates$
。希望外一个前内门从buffer
火灾suscription $。如果做不到这一点,你会看到相应的网络中断数据的发射暂停。
好吧,如果不工作,然后用scan
标准的解决方案。你追求的行为可以表示为(微小)状态机,具有两个状态:passthrough
和buffering
。可以实现与scan
所有这些状态机。
这里去scan
解决方案:https://jsfiddle.net/1znvwyzc/
const gates$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.map(x => ({gates: x}))
.share()
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.map(x => ({data: x}))
.share()
const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
if (acc.controlState === 'passthrough'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'passthrough',
bufferedData : [],
out : val.data
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates passing from true to true -> no changes to perform
return {
controlState : 'passthrough',
bufferedData : [],
out : null
}
} else {
// gates passing from true to false, switch control state
return {
controlState : 'buffered',
bufferedData : [],
out : null
}
}
}
}
if (acc.controlState === 'buffered'){
if (Object.keys(val).includes('data')) {
return {
controlState : 'buffered',
bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
out : null
}
}
if (Object.keys(val).includes('gates')) {
if (val.gates) {
// gates from false to true -> switch control state and pass the buffered data
return {
controlState : 'passthrough',
bufferedData : [],
out : acc.bufferedData
}
} else {
// gates from false to false -> nothing to do
return {
controlState : 'buffered',
bufferedData : acc.bufferedData,
out : null
}
}
}
}
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))
out$.subscribe(_ => console.log(_))
你可以看到这里使用完全相同的技术:How do I conditionally buffer key input based on event in RxJs
有条件延迟数据$另一种方法是使用delayWhen()这样的:
const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
.pipe(delayWhen(triggerF))
.subscribe( (v) => console.log(v));
// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);
const gate$ = Rx.Observable.interval(2000)
.map(_ => Math.random() >= 0.5)
.filter(_ => _)
const data$ = Rx.Observable.interval(500)
.map(_ => "data"+ _)
.buffer(gate$)
.flatMap(_ => Rx.Observable.from(_))
data$.subscribe(_ => console.log(_))
栅极流产生随机真假的值(例如N / W是向上或向下)。我们从这个流发出只有真正的瓦莱斯
基于此流truthy值,我们缓冲我们的数据流。
看到小提琴 - fiddle。不要忘了打开浏览器控制台:)
通过这个帖子的贡献的启发,下面似乎得到期望的行为:
const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)
它可能更简单,必须在https://jsfiddle.net/KristjanLaane/6kbgnp41/一出戏