我希望创建一个可观察的对象,它将重放符合条件的最新消息,同时让其他消息通过,但我正在努力使其与当前的 API 界面一起工作。
目标是满足以下计划
using System.Reactive.Linq;
using System.Reactive.Subjects;
internal class Program
{
public static void Main(string[] args)
{
var s = new Subject<(char, int)>();
var targetSub = s
.ReplayWhere(1, v => v.Item1 == 'A')
.RefCount(1);
var sub1 = new List<(char, int)>();
targetSub.Subscribe(sub1.Add);
s.OnNext(('A', 1));
s.OnNext(('B', 1));
s.OnNext(('C', 1));
var sub2 = new List<(char, int)>();
targetSub.Subscribe(sub2.Add);
s.OnNext(('A', 2));
s.OnNext(('C', 2));
var sub3 = new List<(char, int)>();
targetSub.Subscribe(sub3.Add);
if(!sub1.SequenceEqual([('A', 1), ('B', 1), ('C', 1), ('A', 2), ('C', 2)])) throw new Exception();
if(!sub2.SequenceEqual([('A', 1), ('A', 2), ('C', 2)])) throw new Exception();
if(!sub3.SequenceEqual([('A', 2)])) throw new Exception();
}
}
public static class ObservableExtensions
{
public static IConnectableObservable<T> ReplayWhere<T>(this IObservable<T> source, int buffer, Func<T, bool> predicate)
{
//TODO: How to achieve this
return source.Replay(0);
}
}
我一直在研究什么是可能的,但似乎唯一公开
IConnectableObservable
进行扩展的 API 表面是 Multicast
这样做的缺点是创建自定义
Subject
感觉很复杂且容易出错。
非常感谢任何建议:)
好吧,对于简单的实现来说,
Multicast
方法并不像我想象的那么糟糕:
// See https://aka.ms/new-console-template for more information
using System.Collections.Concurrent;
using System.Reactive.Linq;
using System.Reactive.Subjects;
internal class Program
{
public static void Main(string[] args)
{
var s = new Subject<(char, int)>();
var targetSub = s
.ReplayWhere(1, v => v.Item1 == 'A')
.RefCount(1);
var sub1 = new List<(char, int)>();
targetSub.Subscribe(sub1.Add);
s.OnNext(('A', 1));
s.OnNext(('B', 1));
s.OnNext(('C', 1));
var sub2 = new List<(char, int)>();
targetSub.Subscribe(sub2.Add);
s.OnNext(('A', 2));
s.OnNext(('C', 2));
var sub3 = new List<(char, int)>();
targetSub.Subscribe(sub3.Add);
if(!sub1.SequenceEqual([('A', 1), ('B', 1), ('C', 1), ('A', 2), ('C', 2)])) throw new Exception();
if(!sub2.SequenceEqual([('A', 1), ('A', 2), ('C', 2)])) throw new Exception();
if(!sub3.SequenceEqual([('A', 2)])) throw new Exception();
}
}
public static class ObservableExtensions
{
public static IConnectableObservable<T> ReplayWhere<T>(this IObservable<T> source, int buffer, Func<T, bool> predicate)
{
return source.Multicast(new ReplayWhereSubject<T>(1, predicate));
}
}
public class ReplayWhereSubject<T> : ISubject<T>
{
private readonly int _bufferSize;
private readonly Func<T, bool> _predicate;
private readonly ConcurrentQueue<T> _lastValues = new();
private readonly Subject<T> _innerSub = new();
public ReplayWhereSubject(int bufferSize, Func<T, bool> predicate)
{
_bufferSize = bufferSize;
_predicate = predicate;
}
public void OnCompleted()
{
_innerSub.OnCompleted();
}
public void OnError(Exception error)
{
_innerSub.OnError(error);
}
public void OnNext(T value)
{
if (_predicate(value))
{
Enqueue(value);
}
_innerSub.OnNext(value);
}
private void Enqueue(T value)
{
_lastValues.Enqueue(value);
if (_lastValues.Count > _bufferSize)
{
_lastValues.TryDequeue(out _);
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
var lastValues = _lastValues.ToObservable();
return lastValues.Concat(_innerSub).Subscribe(observer);
}
}