从同一反应性流中重组元素

问题描述 投票:1回答:1

我想要实现的目标可以描述如下:

  • 我有一个samples流,带有时间戳的测量值。这是原始流。
  • 我在原始流上应用了一个过滤器,从而得到了派生流(它将是来自this question的滞后滤波器,但是为了简单起见,我在这里使用了Where运算符)
  • 为了消除由于值缓慢变化而造成的巨大差距,我将Sample运算符应用于原始流
  • 我正在将两个流合并为结果流

概念看起来像这样:

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

var s = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().AutoConnect();

var s1 = s.Where(x => x % 5 == 0);
var s2 = s.Sample(TimeSpan.FromMilliseconds(1000));

new[] {s1, s2}.Merge()./*Distinct().*/Subscribe(Console.WriteLine, cts.Token);

await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);

原始来源很热。如果没有Distinct,我显然会得到重复的值,它看起来会产生我期望看到的结果。

有没有更好的方法,事实是,第一个派生流不是周期性的?

c# system.reactive rx.net
1个回答
1
投票

您可以将索引附加到源可观察对象中,然后将DistinctUntilChanged应用到最终合并的可观察对象中。

var withIndex = s.Select((x, i) => (Item : x, Index : i));
var s1 = withIndex.Where(p => p.Item % 5 == 0);
var s2 = withIndex.Sample(TimeSpan.FromMilliseconds(1000));

new[] { s1, s2 }
    .Merge()
    .DistinctUntilChanged(p => p.Index) // discard duplicates
    .Select(p => p.Item) // discard the index
    .Subscribe(Console.WriteLine, cts.Token);

我想运算符DistinctUntilChangedDistinct更轻巧,因为它仅缓存最新的元素。

© www.soinside.com 2019 - 2024. All rights reserved.