我为响应式扩展创建了SlidingWindow()运算符,因为我想轻松地监视滚动平均值之类的事情。作为一个简单的示例,我想订阅以听取鼠标事件,但是每次发生一个事件时,我都希望接收最后三个(而不是等待每三个事件接收最后三个)。这就是为什么我发现Window重载似乎无法为我提供开箱即用的需求的原因。
这是我想出的。考虑到它频繁的List操作,我担心它可能不是最有效的解决方案:
public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
var seed = new List<T>();
Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
{
list.Add(arg2);
if (list.Count > length)
list.RemoveRange(0, (list.Count - length));
return list;
};
return seq.Scan(seed, accumulator)
.Where(list => list.Count == length);
}
可以这样称呼:
var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();
但是,令我惊讶的是,没有收到预期的结果
1,2,3
2,3,4
3,4,5
我收到结果
2,3,4
3,4,5
3,4,5
任何见解将不胜感激!
可能一样好,而且更容易阅读:
public static IObservable<IList<T>> SlidingWindow<T>(
this IObservable<T> src,
int windowSize)
{
var feed = src.Publish().RefCount();
// (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list
return Observable.Zip(
Enumerable.Range(0, windowSize)
.Select(skip => feed.Skip(skip))
.ToArray());
}
测试装备:var source = Observable.Range(0, 10);
var query = source.SlidingWindow(3);
using(query.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
输出:
ListOf(0,1,2)
ListOf(1,2,3)
ListOf(2,3,4)
ListOf(3,4,5)
ListOf(4,5,6)
...
编辑:顺便说一句,自从不做一次就被烧死之后,我发现自己强迫.Publish().RefCount()
...我不认为这里是严格要求的,等等。>>
编辑yzorg:如果这样增加方法,您将更清楚地看到运行时行为:
public static IObservable<IList<T>> SlidingWindow<T>( this IObservable<T> src, int windowSize) { var feed = src.Publish().RefCount(); // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list return Observable.Zip( Enumerable.Range(0, windowSize) .Select(skip => { Console.WriteLine("Skipping {0} els", skip); return feed.Skip(skip); }) .ToArray()); }