获取 IObservable 中的前一个元素,无需重新评估序列

问题描述 投票:0回答:6

IObservable
序列中(在 .NET 的响应式扩展中),我想获取先前元素和当前元素的值,以便可以比较它们。我在网上找到了一个类似于下面的示例,它完成了任务:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

它工作得很好,只是它对序列进行了两次评估,这是我想避免的。您可以看到使用以下代码对其进行了两次评估:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

输出显示的调试行数量是序列中元素数量的两倍。

我明白为什么会发生这种情况,但到目前为止我还没有找到一种不两次评估序列的替代方案。如何仅通过一次序列评估将上一个和当前的结果结合起来?

c# system.reactive
6个回答
33
投票

我认为有一个更好的解决方案,即使用 Observable.Scan 并避免双重订阅:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

我已将其写在我的博客上:http://www.zerobugbuild.com/?p=213

附录

进一步的修改允许您通过使用结果选择器更干净地处理任意类型:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}

5
投票

@James World 附录对我来说看起来很棒,如果不是因为

Tuple<>
,我几乎总是不喜欢它:“.Item1 是前一个吗?还是当前的?我不记得了。第一个参数是什么选择器,是前一项吗?”。

对于这一部分,我喜欢@dcstraw 对专用

ItemWithPrevious<T>
的定义。因此,您可以将两者放在一起(希望我没有将以前的与当前的混淆)并进行一些重命名和设施:

public static class ObservableExtensions
{
    public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
        this IObservable<TSource> source, 
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source.Scan(seed,
            (acc, current) => SortedPair.Create(current, acc.Current));
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IObservable<TSource> source,
        Func<SortedPair<TSource>, TResult> resultSelector,
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source
            .Scan(seed,
                (acc, current) => SortedPair.Create(current, acc.Current))
            .Select(p => resultSelector(p));
    }
}

public class SortedPair<T>
{
    public SortedPair(T current, T previous)
    {
        Current = current;
        Previous = previous;
    }

    public SortedPair(T current) : this(current, default(T)) { }

    public SortedPair() : this(default(T), default(T)) { }

    public T Current;
    public T Previous;
}

public class SortedPair
{
    public static SortedPair<T> Create<T>(T current, T previous)
    {
        return new SortedPair<T>(current, previous);
    }

    public static SortedPair<T> Create<T>(T current)
    {
        return new SortedPair<T>(current);
    }

    public static SortedPair<T> Create<T>()
    {
        return new SortedPair<T>();
    }
}

3
投票

评估两次是冷可观察值的指标。您可以使用 .Publish() 将其转为热门:

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();

0
投票

如果您只需要在订阅期间访问前一个元素,这可能是最简单的方法。 (我确信有更好的方法,也许是 IObservable 上的缓冲区运算符?目前文档非常稀疏,所以我不能真正告诉你。)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
    {
        if (prev != null)
        {
            // Previous and current element available here
        }

        prev = curr;                              

    });

EventArgs 只是事件参数类型的替代。


0
投票

接受的答案很好,我建议的唯一更改是使用 Tuple 字段名称 而不是 Tuple.Create 或结果选择器。

public static IObservable<(T? Previous, T? Current)> CombineWithPrevious<T>(this IObservable<T> source)
{
  (T? Previous, T? Current) seed = (default(T), default(T));
  return source.Scan(seed, (combination, latest) => (combination.Current, latest));
}

使用示例:

var foo = bar.CombineWithPrevious().Select(x => $"{x.Current} {x.Previous}");

-1
投票

事实证明,您可以使用变量来保存先前的值并引用它,并在

IObservable
扩展链中重新分配它。这甚至可以在辅助方法中工作。使用下面的代码,我现在可以在
CombineWithPrevious()
上调用
IObservable
来获取对先前值的引用,而无需重新评估序列。

public class ItemWithPrevious<T>
{
    public T Previous;
    public T Current;
}

public static class MyExtensions
{
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
    {
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.