[背景-Angular + .net Core WebApi,假设我正在构建一个时间管理应用程序,以跟踪您在任务上花费了多少时间->用户在前端创建任务并启动它->有一个计时器显示已用时间时间。
[主意-经过的时间来自后端。当用户通过服务在前端启动任务时,我在后端启动计时器并将经过的时间传递回前端,这样我就可以将其显示给用户。即使后端和前端之间存在连接问题,我也想显示正确的值。
图表:后端流每秒发送一次值(我向用户显示的经过时间),但出现连接故障,并且在6秒后冻结一会儿,然后发送9(“ 0:09”),这可能会使用户感到困惑(“以前是6秒,现在是9?”)。因此,我在前端发出间隔并每秒发出新值。我想每秒检查后端流是否发送了新值,如果不是,我想获取以前的值并对其进行修改,以便用户获得正确的值。
bStream => ---1---2---3---4---5---6---x---x---9
fStream => ---1---2---3---4---5---6---7---8---9
What user sees:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 (freeze) -> 00:09
What I want to user to see:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> (freeze - frontend knows there is no new value so adds 1 second to previous)
So, it should look like that:
00:01 -> 00:02 -> 00:03 -> 00:04 -> 00:05 -> 00:06 -> 00:07 -> 00:08 -> 00:09
我正在努力从哪里开始。
使用两个流进行了快速提琴,但无法找到如何找到它的方法bStream没有发出新的值。
请检查下面的代码。要模拟同步问题,请将代码v * 5
更改为v * 4
,然后计数器将在获得“后端”的值时遵循该值。
// emit every 5s
const source = interval(5000).pipe(
map(v1 => v1 + 1), // <- only for the example. starting counting from 1.
startWith(0), // <- only for the example. instant emit of 0.
map(v => v * 5), // <- every 5 seconds we get right passed amount. so it emits 0 5 10 15.
);
// emit every 1s
const secondSource = interval(1000).pipe(
delay(50), // <- we want it to be a bit later than original counter.
map(v1 => v1 + 1), // emits are 1 2 3, not from 0.
startWith(0), // instant emit of 0.
);
source.pipe( // <- it is our slow backend (it gives us an update every 5 seconds.
switchMap(v => secondSource.pipe( // if no emit from parent stream - then in 1.05 we get value from this one.
map(v1 => v + v1), // - adding offset from the second stream to the parent stream.
)),
).subscribe(v => console.log(v));
现在即使从后端有滞后,它也可以从0平稳地计数到N。
已更新
还有甚至更简单的方法,但是问题在于后端应答时它不依赖时间,而是拥有自己的1秒周期。
pipe(
bufferTime(1000), // <- collects for a second.
scan((a, b) => b.length ? a + 1 : b[0], 0), // assumes or returns.
);
这里是一种方法:
const be$ = concat(
of(1).pipe(delay(100)),
of(2).pipe(delay(100)),
of(3).pipe(delay(100)),
of(4).pipe(delay(100)),
of(5).pipe(delay(100)),
of(6).pipe(delay(100)),
of(10).pipe(delay(500)), // After freeze
of(11).pipe(delay(100)),
of(12).pipe(delay(100)),
).pipe(shareReplay({ bufferSize: 1, refCount: true, }), endWith(null));
// `skip(1)` - the `ReplaySubject` used be `shareReplay()` will give us the latest value
// and it's not needed
const beReady$ = be$.pipe(skip(1), take(1));
const fe$ = be$.pipe(
mergeMap(v => merge(
of(v),
of(v).pipe(
expand(v => timer(100).pipe(map(v1 => 1 + v))),
takeUntil(beReady$),
)
)),
distinctUntilChanged(),
filter(v => v !== null)
).subscribe(console.log)
endWith(null)
-为了在发出最后一个值(12
)时停止递归,我们需要源发出其他东西
shareReplay
-源需要共享,因为除了主订户([beReady$
)
fe$
)mergeMap(v => merge(
of(v), // Pass along the current value
of(v).pipe(
// If the `be$` does not emit in the next 100ms, send the `currentValue + 1`
// and keep doing the same until the `be$` finally emits
expand(v => timer(100).pipe(map(v1 => 1 + v))),
takeUntil(beReady$),
)
)),
[expand
就像使用mergeMap
,但:
takeUntil(beReady$)
是如何停止递归的方法