RxJS 缓冲直到流为真

问题描述 投票:0回答:4

我想缓冲流,直到流的谓词为真:

例如,数字可以除以五:

//emit value every 1 second
const oneSecondInterval = Rx.Observable.interval(1000);
//return an observable that checks if the number is divided by zero
const fiveSecondInterval = () => oneSecondInterval.filter(number => number % 5 === 0);

const bufferWhenExample = oneSecondInterval.bufferWhen(fiveSecondInterval);
//log values
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer: ', val));

输出

"Emitted Buffer: "
[]
"Emitted Buffer: "
[0, 1]
"Emitted Buffer: "
[2]
"Emitted Buffer: "
[3]
"Emitted Buffer: "
[4]
"Emitted Buffer: "
[5]
"Emitted Buffer: "
[6]
"Emitted Buffer: "
[7]
"Emitted Buffer: "
[8]
"Emitted Buffer: "
[9]
"Emitted Buffer: "
[10]

我想要的:

"Emitted Buffer: "
[0]
"Emitted Buffer: "
[1,2,3,4,5]
"Emitted Buffer: "
[6,7,8,9,10]

但这不起作用。为什么?

演示:http://jsbin.com/durerimiju/1/edit?js,console


更新

这已经差不多好了

const oneSecondInterval = Rx.Observable.interval(1000);
const fiveSecondInterval = oneSecondInterval.filter(time => time % 5 === 0);
const bufferWhenExample = oneSecondInterval.buffer(fiveSecondInterval);

bufferWhenExample.subscribe(console.log)

唯一的问题是它会发射

[]
[0,1,2,3,4]
[5,6,7,8,9]

相反,我想要

[0]
[1,2,3,4,5]
[6,7,8,9,10]
javascript rxjs rxjs5
4个回答
3
投票

这是似乎对我有用的解决方案(在 Typescript 中):

import { interval, OperatorFunction } from 'rxjs';
import { buffer, delay, filter, share, tap } from 'rxjs/operators';

export function bufferUntil<T>(predicate:(value:T) => boolean):OperatorFunction<T, T[]>
{
    return function(source)
    {
        const share$ = source.pipe(share());
        const until$ = share$.pipe(filter(predicate), delay(0));
        return share$.pipe(buffer(until$));
    };
}

interval(1000).pipe(
    tap(console.log),
    bufferUntil(value => value % 5 === 0),
    tap(console.log)
).subscribe();

这会产生以下输出:

0
[0] // this emits immediately after the previous log
1
2
3
4
5
[1, 2, 3, 4, 5] // this emits immediately after the previous log
6
7
8
9
10
[6, 7, 8, 9, 10] // this emits immediately after the previous log

如果有人有更好的解决方案,或者我的实现在某种程度上是危险的或不正确的,我很乐意听到它,因为我自己需要这个功能。


2
投票

bufferWhen 运算符获取一个关闭可观察工厂函数,该函数在每个缓冲区启动时创建可观察对象。在您的情况下,对于每次缓冲迭代,它都会创建一个始终以 0 开头的新间隔可观察值。

您可以简单地使用 buffer 运算符来获取关闭的可观察值,它将按预期工作:

const oneSecondInterval = Rx.Observable.interval(1000);
const fiveSecondInterval = oneSecondInterval.filter((number) => number % 5 === 0);
oneSecondInterval.buffer(fiveSecondInterval);

我编辑了你的 jsbin here


1
投票

interval
observable 不会发出它消失的次数计数,它只是发出
undefined
。因此,当您执行
.filter(number => number % 5 === 0)
时,谓词始终返回 false。

要记录已发出的值的数量,您可以使用 .scan 运算符

const fiveSecondInterval = () => 
    oneSecondInterval
        .scan(count => count + 1, 0)
        .filter(count => count % 5 === 0);

0
投票

解决@michalc 在 @TroyWeber 的答案的评论中提出的问题。

这是一个非常简单的函数(我相信)在所有情况下都有效。它不会做任何花哨的事情或利用其他操作员,但可以完成工作!

import { from, Observable, type OperatorFunction } from 'rxjs'

function bufferUntil<T>(predicate: (value: T) => boolean): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => new Observable<T[]>(observer => {
    let buffer: T[] = []

    return source.subscribe({
      next(value) {
        buffer.push(value)

        if (predicate(value)) {
          observer.next(buffer)
          buffer = []
        }
      },
      error(err) { observer.error(err) },
      complete() { observer.complete() },
    })
  })
}

from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]).pipe(
  bufferUntil(n => n % 5 === 0),
).subscribe(console.log)
© www.soinside.com 2019 - 2024. All rights reserved.