我想去除一个流 - 但前提是源值与以前相同。我如何用RxJS 5做到这一点?
如果值相同并且我在指定的时间窗口内发出它,我不想发出值。我应该能够使用流中的值 - 或者比较distinctUntilChanged的比较函数。
如果不创建自己的运算符,我不知道有什么方法可以做到这一点,因为你需要保持某种状态(最后看到的值)。
一种方式看起来像这样:
// I named this debounceDistinctUntilChanged but that might not be
// the best name. Name it whatever you think makes sense!
function debounceDistinctUntilChanged(delay) {
const source$ = this;
return new Observable(observer => {
// Using an object as the default value
// so that the first time we check it
// if its the same its guaranteed to be false
// because every object has a different identity.
// Can't use null or undefined because source may
// emit these!
let lastSeen = {};
return source$
.debounce(value => {
// If the last value has the same identity we'll
// actually debounce
if (value === lastSeen) {
return Observable.timer(delay);
} else {
lastSeen = value;
// This will complete() right away so we don't actually debounce/buffer
// it at all
return Observable.empty();
}
})
.subscribe(observer);
});
}
现在您看到了一个实现,您可能(或可能不会)发现它与您的期望不同。你的描述实际上遗漏了某些细节,比如它应该只是你在去抖时间框架内保留的最后一个值,或者它是否是一组 - 基本上是distinctUntilChanged
与distinct
。我假设后者。
无论哪种方式,这有望为您提供一个起点,并揭示创建自定义运算符是多么容易。内置运算符绝对不能为所有内容提供解决方案,因此任何足够高级的应用程序都需要自己制作(或者在不抽象的情况下内联强制执行内容,这也很好)。
然后,您可以将此运算符放在Observable原型上:
Observable.prototype.debounceDistinctUntilChanged = debounceDistinctUntilChanged;
// later
source$
.debounceDistinctUntilChanged(400)
.subscribe(d => console.log(d));
或者通过使用let
:
// later
source$
.let(source$ => debounceDistinctUntilChanged.call($source, 400))
.subscribe(d => console.log(d));
如果可以,我建议您真正了解我的代码所做的事情,以便将来您能够轻松制作自己的解决方案。
这取决于你想要做什么;当我尝试做类似的事情时,我遇到了这个问题,基本上是去抖动但是对于不同的对象值有不同的去抖动。
从jayphelps尝试解决方案后,我无法按照我的意愿行事。经过多次来回,原来有一种简单的方法可以做到:groupby。
const priceUpdates = [
{bid: 10, id: 25},
{bid: 20, id: 30},
{bid: 11, id: 25},
{bid: 21, id: 30},
{bid: 25, id: 30}
];//emit each person
const source = Rx.Observable.from(priceUpdates);
//group by age
const example = source
.groupBy(bid => bid.id)
.mergeMap(group$ => group$.debounceTime(500))
const subscribe = example.subscribe(val => console.log(val));
输出:
[object Object] {
bid: 11,
id: 25
}
[object Object] {
bid: 25,
id: 30
}
Jsbin:http://jsbin.com/savahivege/edit?js,console
此代码将根据出价ID进行分组并对其进行去抖动,因此仅为每个代码发送最后一个值。
更新rxjs 6:
source$
.pipe(
// debounceTime(300), optionally un-comment this to add debounce
distinctUntilChanged(),
)
.subscribe(v => console.log(v))
当源“值”发生变化或者自上次发射后经过一些“延迟”时间(即使'值'未更改)时,此rxjs6 +运算符将发出:
export function throttleUntilChanged(delay: number) {
return (source: Observable<any>) => {
return new Observable(observer => {
let lastSeen = {};
let lastSeenTime = 0;
return source
.pipe(
flatMap((value: any) => {
const now = Date.now();
if (value === lastSeen && (now - lastSeenTime) < delay ) {
return empty();
} else {
lastSeen = value;
lastSeenTime = now;
return of(value);
}
})
)
.subscribe(observer);
});
};
}