与rxjs条件发射延迟

问题描述 投票:3回答:4

a-thousand-words

From picture to code?

如何从数据和盖茨外出时可观察到的?

  • 数据是可观察到的任何类型的例如JSON对象将被发送到远程后端
  • 盖茨是一个布尔观察到,这里的蜱对应于真实的十字架为false。例如,互联网连接,从而真正意味着使网络成为访问和错误反映了断线。
  • 出是所得可观察到的,其发射相同的数据,有时立即,有时用的延迟,这取决于前栅极。例如,我可以订阅到Out以张贴发出JSON对象到远程API,同时连接到互联网。
angular rxjs observable reactive-programming rxjs5
4个回答
2
投票

对于我个人理解,你需要data$gates$发出真实的,data$的缓冲,否则,当gates$再次发出真正的结束,所以某事像:

out$ = gates$.switchMap(x => x? data$ : data$.buffer(gates$))

假设:data$gates$是热流(参见这意味着什么这里Hot and Cold observables : are there 'hot' and 'cold' operators?)。

这不是测试,但尝试一下,让我们知道,如果它确实工作(或代码证明它就像你说:-)。逻辑看起来不错,我只是不确定再入gates$。希望外一个前内门从buffer火灾suscription $。如果做不到这一点,你会看到相应的网络中断数据的发射暂停。

好吧,如果不工作,然后用scan标准的解决方案。你追求的行为可以表示为(微小)状态机,具有两个状态:passthroughbuffering。可以实现与scan所有这些状态机。

这里去scan解决方案:https://jsfiddle.net/1znvwyzc/

const gates$ = Rx.Observable.interval(2000)
                            .map(_ => Math.random() >= 0.5)
                            .map(x => ({gates: x}))
                            .share()

const data$ = Rx.Observable.interval(500)
                           .map(_ => "data"+ _)
                           .map(x => ({data: x}))                           
                           .share()

const out$ = Rx.Observable.merge(gates$, data$).scan((acc, val) => {
  if (acc.controlState === 'passthrough'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'passthrough',
        bufferedData : [],
        out : val.data
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates passing from true to true -> no changes to perform
        return {
        controlState : 'passthrough',
        bufferedData : [],
        out : null
        }
      } else {
        // gates passing from true to false, switch control state
        return {
        controlState : 'buffered',
        bufferedData : [],
        out : null        
        }
      }      
    }
  }
  if (acc.controlState === 'buffered'){
    if (Object.keys(val).includes('data')) {
      return {
        controlState : 'buffered',
        bufferedData : (acc.bufferedData.push(val.data), acc.bufferedData),
        out : null              
      }
    }
    if (Object.keys(val).includes('gates')) {
      if (val.gates) {
        // gates from false to true -> switch control state and pass the buffered data
        return {
          controlState : 'passthrough',
          bufferedData : [],
          out : acc.bufferedData              
        }
      } else {
        // gates from false to false -> nothing to do
        return {
          controlState : 'buffered',
          bufferedData : acc.bufferedData,
          out : null                    
        }
      }
    }
  }
}, {controlState : 'passthrough', bufferedData : [], out:null})
.filter(x => x.out)
.flatMap(x => Array.isArray(x.out) ? Rx.Observable.from(x.out) : Rx.Observable.of(x.out))

out$.subscribe(_ => console.log(_))   

你可以看到这里使用完全相同的技术:How do I conditionally buffer key input based on event in RxJs


1
投票

有条件延迟数据$另一种方法是使用delayWhen()这样的:

const gate$ = new BehaviorSubject<boolean>(false);
const triggerF = _ => gate$.pipe(filter(v => v));
const out$ = data$
  .pipe(delayWhen(triggerF))              
  .subscribe( (v) => console.log(v));

// then trigger gate$, for instance:
setTimeout(() => gate$.next(true), 5000);
setTimeout(() => gate$.next(false), 10000);

0
投票
const gate$ = Rx.Observable.interval(2000)
                           .map(_ => Math.random() >= 0.5)
                           .filter(_ => _)


const data$ = Rx.Observable.interval(500)
                            .map(_ => "data"+ _)
                            .buffer(gate$)
                            .flatMap(_ => Rx.Observable.from(_))

data$.subscribe(_ => console.log(_))                          

栅极流产生随机真假的值(例如N / W是向上或向下)。我们从这个流发出只有真正的瓦莱斯

基于此流truthy值,我们缓冲我们的数据流。

看到小提琴 - fiddle。不要忘了打开浏览器控制台:)


0
投票

通过这个帖子的贡献的启发,下面似乎得到期望的行为:

const ticks$ = gates$.filter(b => b)
const crosses$ = gates$.filter(b => !b)
const tickedData$ = data$.windowToggle(ticks$, _ => crosses$.take(1)).switch()
const crossedDataBuffers$ = data$.bufferToggle(crosses$, _ => ticks$.take(1))
const crossedData$ = Rx.Observable.from(crossedDataBuffers$)
const out$ = tickedData$.merge(crossedData$)

它可能更简单,必须在https://jsfiddle.net/KristjanLaane/6kbgnp41/一出戏

© www.soinside.com 2019 - 2024. All rights reserved.