我需要RX中的磁滞滤波器的功能。仅当先前发出的值和当前输入值相差一定量时,才应从源流中发出一个值。作为通用扩展方法,它可以具有以下签名:
public static IObservable<T> HysteresisFilter<T>(this IObservable<t> source, Func<T/*previously emitted*/, T/*current*/, bool> filter)
我无法弄清楚如何使用现有的运营商来实现这一点。我正在寻找来自RxJava的lift之类的东西,这是创建我自己的运算符的任何其他方法。我已经看到了此checklist,但在网上找不到任何示例。
以下方法(实际上都是相同的)对我来说似乎是解决方法,但是还有更多的[[Rx方法来做到这一点,就像没有包装subject
或实际上没有实现运算符吗?
async Task Main()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var rnd = new Random();
var s = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Scan(0d, (a,_) => a + rnd.NextDouble() - 0.5)
.Publish()
.AutoConnect()
;
s.Subscribe(Console.WriteLine, cts.Token);
s.HysteresisFilter((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"1> {x}"), cts.Token);
s.HysteresisFilter2((p, c) => Math.Abs(p - c) > 1d).Subscribe(x => Console.WriteLine($"2> {x}"), cts.Token);
await Task.Delay(Timeout.InfiniteTimeSpan, cts.Token).ContinueWith(_=>_, TaskContinuationOptions.OnlyOnCanceled);
}
public static class ReactiveOperators
{
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
return new InternalHysteresisFilter<T>(source, filter).AsObservable;
}
public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source, Func<T, T, bool> filter)
{
var subject = new Subject<T>();
T lastEmitted = default;
bool emitted = false;
source.Subscribe(
value =>
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
, ex => subject.OnError(ex)
, () => subject.OnCompleted()
);
return subject;
}
private class InternalHysteresisFilter<T>: IObserver<T>
{
Func<T, T, bool> filter;
T lastEmitted;
bool emitted;
private readonly Subject<T> subject = new Subject<T>();
public IObservable<T> AsObservable => subject;
public InternalHysteresisFilter(IObservable<T> source, Func<T, T, bool> filter)
{
this.filter = filter;
source.Subscribe(this);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
public void OnNext(T value)
{
if (!emitted || filter(lastEmitted, value))
{
subject.OnNext(value);
lastEmitted = value;
emitted = true;
}
}
public void OnError(Exception error)
{
subject.OnError(error);
}
public void OnCompleted()
{
subject.OnCompleted();
}
}
}
旁注:将有数千个这样的过滤器应用于尽可能多的流。我需要吞吐量和延迟之间的联系,因此,即使其他设备看起来比较理想,我也在寻找一种解决方案,以在CPU和内存中使用最少的开销。
Observable.Create
创建新的运算符。Create工厂方法是实现自定义可观察序列的首选方法。主题的使用应很大程度上保留在样本和测试领域。 (citation)
public static IObservable<T> HysteresisFilter<T>(this IObservable<T> source,
Func<T, T, bool> predicate)
{
return Observable.Create<T>(observer =>
{
T lastEmitted = default;
bool emitted = false;
return source.Subscribe(value =>
{
if (!emitted || predicate(lastEmitted, value))
{
observer.OnNext(value);
lastEmitted = value;
emitted = true;
}
}, observer.OnError, observer.OnCompleted);
});
}
Observable.Create
,我通常会避免。public static IObservable<T> HysteresisFilter2<T>(this IObservable<T> source,
Func<T, T, bool> predicate)
{
return source
.Scan((emitted: default(T), isFirstItem: true, emit: false), (state, newItem) => state.isFirstItem || predicate(state.emitted, newItem)
? (newItem, false, true)
: (state.emitted, false, false)
)
.Where(t => t.emit)
.Select(t => t.emitted);
}
.Scan
是您在可观察的项目之间跟踪状态时要使用的内容。