我有无尽的事件流,我需要将它们限制为5,让其余部分暂停3秒
因此需要每5个电话后延迟一次
from([ 1,2,3,4,5,6,7,8,9,11,12,13,14,15,16,17,18,19,21,22,23,24,25,26,27,28 ])
.pipe(
// To demonstrate 1 after 1 values stream we use concatMap
// we return new Observalbe via of operator
// we pipe the delay for each element based on the index value we passed
// in our concatMap
concatMap((x,i) => of(x).pipe(
delayWhen((x) => {
console.log("im index: " + i);
// Not the first element, and every 5th element
return i !== 0 && i % 5 === 0 ? timer(3000): timer(0)})
))
)
.subscribe(x => console.log(x))
// Output: 1,2,3,4,5 ...delay 3s.... 6,7,8,9,10 ...delay 3s...
您可以在我制作的this stackblitz中看到。
const stream = range(0, 100) // create dataset
.pipe(
bufferCount(5), // slice data into chunks
concatMap( // get this chunk
(msg) => of(msg).pipe(
delay(3000) // and emit every three seconds
))
)
stream.subscribe(item => console.log(item));