如何在rx.net中实现我自己的运算符

问题描述 投票:1回答:2

我需要RX中的磁滞滤波器的功能。仅当先前发出的值和当前输入值相差一定量时,才应从源流中发出一个值。作为通用扩展方法,它可以具有以下签名:

public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)

我无法弄清楚如何使用现有的运营商来实现这一点。我正在寻找来自RxJava的lift之类的东西,这是创建我自己的运算符的任何其他方法。我已经看到了此checklist,但在网上找不到任何示例。

以下方法(实际上都是相同的)对我来说似乎是解决方法,但是还有更多的[[Rx方法来做到这一点,就像没有包装subject或实际上没有实现运算符吗?

async Task Main() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var rnd = new Random(); var s = Observable.Interval(TimeSpan.FromMilliseconds(10)) .Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5) .Publish() .AutoConnect() ; s.Subscribe(Console.WriteLine, cts.Token); s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token); s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token); await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled); } public static class ReactiveOperators { public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter) { return new InternalHysteresisFilter<T>(source, filter).AsObservable; } public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter) { var subject = new Subject<T>(); T lastEmitted = default; bool emitted = false; source.Subscribe( value => { if (!emitted || filter(lastEmitted, value)) { subject.OnNext(value); lastEmitted = value; emitted = true; } } , ex => subject.OnError(ex) , () => subject.OnCompleted() ); return subject; } private class InternalHysteresisFilter<T>: IObserver<T> { Func<T, T, bool> filter; T lastEmitted; bool emitted; private readonly Subject<T> subject = new Subject<T>(); public IObservable<T> AsObservable => subject; public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter) { this.filter = filter; source.Subscribe(this); } public IDisposable Subscribe(IObserver<T> observer) { return subject.Subscribe(observer); } public void OnNext(T value) { if (!emitted || filter(lastEmitted, value)) { subject.OnNext(value); lastEmitted = value; emitted = true; } } public void OnError(Exception error) { subject.OnError(error); } public void OnCompleted() { subject.OnCompleted(); } } }
旁注:将有数千个这样的过滤器应用于尽可能多的流。我需要吞吐量和延迟之间的联系,因此,即使其他设备看起来比较理想,我也在寻找一种解决方案,以在CPU和内存中使用最少的开销。 
c# system.reactive rx.net
2个回答
0
投票
我在书Introduction to Rx中看到的大多数示例都使用方法Observable.Create创建新的运算符。

Create工厂方法是实现自定义可观察序列的首选方法。主题的使用应很大程度上保留在样本和测试领域。 (citation

public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> predicate) { return Observable.Create<T>(observer => { T lastEmitted = default; bool emitted = false; return source.Subscribe(value => { if (!emitted || predicate(lastEmitted, value)) { observer.OnNext(value); lastEmitted = value; emitted = true; } }, observer.OnError, observer.OnCompleted); }); }

0
投票
此答案与@Theodor相同,但避免使用Observable.Create,我通常会避免。

public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> predicate) { return source .Scan((emitted: default(T), isFirstItem: true, emit: false), (state, newItem) => state.isFirstItem || predicate(state.emitted, newItem) ? (newItem, false, true) : (state.emitted, false, false) ) .Where(t => t.emit) .Select(t => t.emitted); }

.Scan是您在可观察的项目之间跟踪状态时要使用的内容。 
© www.soinside.com 2019 - 2024. All rights reserved.