在Rx中实现滑动窗口的麻烦

问题描述 投票:10回答:4

我为响应式扩展创建了SlidingWindow()运算符,因为我想轻松地监视滚动平均值之类的事情。作为一个简单的示例,我想订阅以听取鼠标事件,但是每次发生一个事件时,我都希望接收最后三个(而不是等待每三个事件接收最后三个)。这就是为什么我发现Window重载似乎无法为我提供开箱即用的需求的原因。

这是我想出的。考虑到它频繁的List操作,我担心它可能不是最有效的解决方案:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
    var seed = new List<T>();

    Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
    {
        list.Add(arg2);

        if (list.Count > length)
            list.RemoveRange(0, (list.Count - length));

        return list;
    };

    return seq.Scan(seed, accumulator)
                .Where(list => list.Count == length);
}

可以这样称呼:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();

但是,令我惊讶的是,没有收到预期的结果

1,2,3
2,3,4
3,4,5

我收到结果

2,3,4
3,4,5
3,4,5

任何见解将不胜感激!

system.reactive
4个回答
4
投票
尝试一下-我不得不坐下来考虑一下它的相对性能,但是它[

可能一样好,而且更容易阅读:

public static IObservable<IList<T>> SlidingWindow<T>( this IObservable<T> src, int windowSize) { var feed = src.Publish().RefCount(); // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list return Observable.Zip( Enumerable.Range(0, windowSize) .Select(skip => feed.Skip(skip)) .ToArray()); } 测试装备:
var source = Observable.Range(0, 10);
var query = source.SlidingWindow(3);
using(query.Subscribe(Console.WriteLine))
{               
    Console.ReadLine();
}

输出:

ListOf(0,1,2)
ListOf(1,2,3)
ListOf(2,3,4)
ListOf(3,4,5)
ListOf(4,5,6)
...

编辑:顺便说一句,自从不做一次就被烧死之后,我发现自己强迫.Publish().RefCount() ...我不认为这里是严格要求的,等等。>>

编辑yzorg:

如果这样增加方法,您将更清楚地看到运行时行为:

public static IObservable<IList<T>> SlidingWindow<T>( this IObservable<T> src, int windowSize) { var feed = src.Publish().RefCount(); // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list return Observable.Zip( Enumerable.Range(0, windowSize) .Select(skip => { Console.WriteLine("Skipping {0} els", skip); return feed.Skip(skip); }) .ToArray()); }


12
投票

6
投票

0
投票
© www.soinside.com 2019 - 2024. All rights reserved.