合并rxjs流,检测更改并返回一个值

问题描述 投票:1回答:2

[背景-Angular + .net Core WebApi,假设我正在构建一个时间管理应用程序,以跟踪您在任务上花费了多少时间->用户在前端创建任务并启动它->有一个计时器显示已用时间时间。

[主意-经过的时间来自后端。当用户通过服务在前端启动任务时,我在后端启动计时器并将经过的时间传递回前端,这样我就可以将其显示给用户。即使后端和前端之间存在连接问题,我也想显示正确的值。

图表:后端流每秒发送一次值(我向用户显示的经过时间),但出现连接故障,并且在6秒后冻结一会儿,然后发送9(“ 0:09”),这可能会使用户感到困惑(“以前是6秒,现在是9?”)。因此,我在前端发出间隔并每秒发出新值。我想每秒检查后端流是否发送了新值,如果不是,我想获取以前的值并对其进行修改,以便用户获得正确的值。

bStream => ---1---2---3---4---5---6---x---x---9
fStream => ---1---2---3---4---5---6---7---8---9

What user sees:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 (freeze) -> 00:09

What I want to user to see:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> (freeze - frontend knows there is no new value so adds 1 second to previous)
So, it should look like that:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> 00:07 -> 00:08 -> 00:09

我正在努力从哪里开始。

使用两个流进行了快速提琴,但无法找到如何找到它的方法bStream没有发出新的值。

https://stackblitz.com/edit/typescript-yookbj

javascript typescript .net-core rxjs ngrx
2个回答
2
投票

请检查下面的代码。要模拟同步问题,请将代码v * 5更改为v * 4,然后计数器将在获得“后端”的值时遵循该值。

// emit every 5s
const source = interval(5000).pipe(
    map(v1 => v1 + 1), // <- only for the example. starting counting from 1.
    startWith(0), // <- only for the example. instant emit of 0.

    map(v => v * 5), // <- every 5 seconds we get right passed amount. so it emits 0 5 10 15.
);
// emit every 1s
const secondSource = interval(1000).pipe(
    delay(50), // <- we want it to be a bit later than original counter.
    map(v1 => v1 + 1), // emits are 1 2 3, not from 0.
    startWith(0), // instant emit of 0.
);

source.pipe( // <- it is our slow backend (it gives us an update every 5 seconds.
    switchMap(v => secondSource.pipe( // if no emit from parent stream - then in 1.05 we get value from this one.
        map(v1 => v + v1), // - adding offset from the second stream to the parent stream.
    )),
).subscribe(v => console.log(v));

现在即使从后端有滞后,它也可以从0平稳地计数到N。

已更新

还有甚至更简单的方法,但是问题在于后端应答时它不依赖时间,而是拥有自己的1秒周期。

pipe(
  bufferTime(1000), // <- collects for a second.
  scan((a, b) => b.length ? a + 1 : b[0], 0), // assumes or returns.
);

1
投票

这里是一种方法:

const be$ = concat(
  of(1).pipe(delay(100)),
  of(2).pipe(delay(100)),
  of(3).pipe(delay(100)),
  of(4).pipe(delay(100)),
  of(5).pipe(delay(100)),
  of(6).pipe(delay(100)),

  of(10).pipe(delay(500)), // After freeze
  of(11).pipe(delay(100)),
  of(12).pipe(delay(100)),
).pipe(shareReplay({ bufferSize: 1, refCount: true, }), endWith(null));

// `skip(1)` - the `ReplaySubject` used be `shareReplay()` will give us the latest value
// and it's not needed
const beReady$ = be$.pipe(skip(1), take(1));

const fe$ = be$.pipe(
  mergeMap(v => merge(
    of(v),
    of(v).pipe(
      expand(v => timer(100).pipe(map(v1 => 1 + v))),
      takeUntil(beReady$),
    )
  )),
  distinctUntilChanged(),
  filter(v => v !== null)
).subscribe(console.log)

endWith(null)-为了在发出最后一个值(12)时停止递归,我们需要源发出其他东西

shareReplay-源需要共享,因为除了主订户([beReady$

之外还将有另一个订户(fe$
mergeMap(v => merge(
  of(v), // Pass along the current value
  of(v).pipe(
    // If the `be$` does not emit in the next 100ms, send the `currentValue + 1`
    // and keep doing the same until the `be$` finally emits
    expand(v => timer(100).pipe(map(v1 => 1 + v))),
    takeUntil(beReady$),
  )
)),

[expand就像使用mergeMap,但:

  • 它将传递内部值
  • 它将基于最后一个内部值创建另一个内部可观察的对象;因此,它是递归的
  • takeUntil(beReady$)是如何停止递归的方法

StackBlitz

© www.soinside.com 2019 - 2024. All rights reserved.