被动扩展:节气门/样本,间隔不断变化

问题描述 投票:7回答:3

我有一个IObservable,它以随机的间隔产生值,我想限制此序列。我发现的一件事是Throttle运算符对“节流”的定义与我的不同。

Throttle仅在经过指定的时间间隔后生成值[(它会生成最后看到的值)。我认为节流将意味着在指定的时间间隔内产生值(当然,除非保持沉默)。

说,我期望Observable.Interval(100).Select((_,i) => i).Throttle(200)产生(模数任何性能/时序问题)偶数,因为我将其限制为“半速”。但是,该序列根本不会产生任何值,因为永远不会有长度为200的静默期。

因此,我发现Sample实际上会执行我想要的“限制”行为。 Observable.Interval(100).Select((_,i) => i).Sample(200)产生(再次对所有性能/时序问题取模)偶数序列。

但是,我还有另一个问题:时间间隔会有所不同,具体取决于最后的“采样”值。我想要的是写一个看起来像这样的运算符:

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);

intervalSelector参数为下一个样本生成间隔,并且第一个样本...是从第一个值获取,还是从其他参数获取,我不在乎。

我曾尝试写这篇文章,但最终却遇到了一个复杂的大型结构,无法正常工作。我的问题是,我可以使用现有的运算符(也称为单线)构建它吗?

.net system.reactive
3个回答
5
投票
几个小时后,我睡了一会,就明白了。

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector) { return source.TimeInterval() .Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) => { if(v.Interval >= acc.Item1) { return Tuple.Create(intervalSelector(v.Value), true, v.Value); } return Tuple.Create(acc.Item1 - v.Interval, false, v.Value); }) .Where(t => t.Item2) .Select(x => x.Item3); }

[这如我所愿:每次产生一个值x,它就会停止产生值,直到intervalSelector(x)时间过去。

0
投票
不是您要的for Observable.BufferWithTime吗?

0
投票
我以另一种方式做到了,我想与任何有用的人分享。

var random = new Random(); Observable.Return(Unit.Default) .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(random.Next(1, 6)))) .TimeInterval() .Do(value => Console.WriteLine(value)) .Repeat() .Subscribe();

© www.soinside.com 2019 - 2024. All rights reserved.