在
IObservable
序列中(在 .NET 的响应式扩展中),我想获取先前元素和当前元素的值,以便可以比较它们。我在网上找到了一个类似于下面的示例,它完成了任务:
sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })
它工作得很好,只是它对序列进行了两次评估,这是我想避免的。您可以看到使用以下代码对其进行了两次评估:
var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();
输出显示的调试行数量是序列中元素数量的两倍。
我明白为什么会发生这种情况,但到目前为止我还没有找到一种不两次评估序列的替代方案。如何仅通过一次序列评估将上一个和当前的结果结合起来?
我认为有一个更好的解决方案,即使用 Observable.Scan 并避免双重订阅:
public static IObservable<Tuple<TSource, TSource>>
PairWithPrevious<TSource>(this IObservable<TSource> source)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(acc, current) => Tuple.Create(acc.Item2, current));
}
我已将其写在我的博客上:http://www.zerobugbuild.com/?p=213
进一步的修改允许您通过使用结果选择器更干净地处理任意类型:
public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
this IObservable<TSource> source,
Func<TSource, TSource, TResult> resultSelector)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => resultSelector(t.Item1, t.Item2));
}
@James World 附录对我来说看起来很棒,如果不是因为
Tuple<>
,我几乎总是不喜欢它:“.Item1 是前一个吗?还是当前的?我不记得了。第一个参数是什么选择器,是前一项吗?”。
对于这一部分,我喜欢@dcstraw 对专用
ItemWithPrevious<T>
的定义。因此,您可以将两者放在一起(希望我没有将以前的与当前的混淆)并进行一些重命名和设施:
public static class ObservableExtensions
{
public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
this IObservable<TSource> source,
TSource initialValue = default(TSource))
{
var seed = SortedPair.Create(initialValue, initialValue);
return source.Scan(seed,
(acc, current) => SortedPair.Create(current, acc.Current));
}
public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
this IObservable<TSource> source,
Func<SortedPair<TSource>, TResult> resultSelector,
TSource initialValue = default(TSource))
{
var seed = SortedPair.Create(initialValue, initialValue);
return source
.Scan(seed,
(acc, current) => SortedPair.Create(current, acc.Current))
.Select(p => resultSelector(p));
}
}
public class SortedPair<T>
{
public SortedPair(T current, T previous)
{
Current = current;
Previous = previous;
}
public SortedPair(T current) : this(current, default(T)) { }
public SortedPair() : this(default(T), default(T)) { }
public T Current;
public T Previous;
}
public class SortedPair
{
public static SortedPair<T> Create<T>(T current, T previous)
{
return new SortedPair<T>(current, previous);
}
public static SortedPair<T> Create<T>(T current)
{
return new SortedPair<T>(current);
}
public static SortedPair<T> Create<T>()
{
return new SortedPair<T>();
}
}
评估两次是冷可观察值的指标。您可以使用 .Publish() 将其转为热门:
var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();
如果您只需要在订阅期间访问前一个元素,这可能是最简单的方法。 (我确信有更好的方法,也许是 IObservable 上的缓冲区运算符?目前文档非常稀疏,所以我不能真正告诉你。)
EventArgs prev = null;
sequence.Subscribe(curr =>
{
if (prev != null)
{
// Previous and current element available here
}
prev = curr;
});
EventArgs 只是事件参数类型的替代。
接受的答案很好,我建议的唯一更改是使用 Tuple 字段名称 而不是 Tuple.Create 或结果选择器。
public static IObservable<(T? Previous, T? Current)> CombineWithPrevious<T>(this IObservable<T> source)
{
(T? Previous, T? Current) seed = (default(T), default(T));
return source.Scan(seed, (combination, latest) => (combination.Current, latest));
}
使用示例:
var foo = bar.CombineWithPrevious().Select(x => $"{x.Current} {x.Previous}");
事实证明,您可以使用变量来保存先前的值并引用它,并在
IObservable
扩展链中重新分配它。这甚至可以在辅助方法中工作。使用下面的代码,我现在可以在 CombineWithPrevious()
上调用 IObservable
来获取对先前值的引用,而无需重新评估序列。
public class ItemWithPrevious<T>
{
public T Previous;
public T Current;
}
public static class MyExtensions
{
public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
{
var previous = default(T);
return source
.Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
.Do(items => previous = items.Current);
}
}