`ReplayWhere`可连接可观察

问题描述 投票:0回答:1

我希望创建一个可观察的对象,它将重放符合条件的最新消息,同时让其他消息通过,但我正在努力使其与当前的 API 界面一起工作。

目标是满足以下计划

using System.Reactive.Linq;
using System.Reactive.Subjects;

internal class Program
{
    public static void Main(string[] args)
    {
        var s = new Subject<(char, int)>();

        var targetSub = s
            .ReplayWhere(1, v => v.Item1 == 'A')
            .RefCount(1);

        var sub1 = new List<(char, int)>();
        targetSub.Subscribe(sub1.Add);

        s.OnNext(('A', 1));
        s.OnNext(('B', 1));
        s.OnNext(('C', 1));

        var sub2 = new List<(char, int)>();
        targetSub.Subscribe(sub2.Add);

        s.OnNext(('A', 2));
        s.OnNext(('C', 2));

        var sub3 = new List<(char, int)>();
        targetSub.Subscribe(sub3.Add);

        if(!sub1.SequenceEqual([('A', 1), ('B', 1), ('C', 1), ('A', 2), ('C', 2)])) throw new Exception();
        if(!sub2.SequenceEqual([('A', 1), ('A', 2), ('C', 2)])) throw new Exception();
        if(!sub3.SequenceEqual([('A', 2)])) throw new Exception();
    }
}

public static class ObservableExtensions
{
    public static IConnectableObservable<T> ReplayWhere<T>(this IObservable<T> source, int buffer, Func<T, bool> predicate)
    {
        //TODO: How to achieve this
        return source.Replay(0);
    }
}

我一直在研究什么是可能的,但似乎唯一公开

IConnectableObservable
进行扩展的 API 表面是
Multicast

这样做的缺点是创建自定义

Subject
感觉很复杂且容易出错。

非常感谢任何建议:)

c# system.reactive
1个回答
0
投票

好吧,对于简单的实现来说,

Multicast
方法并不像我想象的那么糟糕:

// See https://aka.ms/new-console-template for more information

using System.Collections.Concurrent;
using System.Reactive.Linq;
using System.Reactive.Subjects;

internal class Program
{
    public static void Main(string[] args)
    {
        var s = new Subject<(char, int)>();

        var targetSub = s
            .ReplayWhere(1, v => v.Item1 == 'A')
            .RefCount(1);

        var sub1 = new List<(char, int)>();
        targetSub.Subscribe(sub1.Add);

        s.OnNext(('A', 1));
        s.OnNext(('B', 1));
        s.OnNext(('C', 1));

        var sub2 = new List<(char, int)>();
        targetSub.Subscribe(sub2.Add);

        s.OnNext(('A', 2));
        s.OnNext(('C', 2));

        var sub3 = new List<(char, int)>();
        targetSub.Subscribe(sub3.Add);

        if(!sub1.SequenceEqual([('A', 1), ('B', 1), ('C', 1), ('A', 2), ('C', 2)])) throw new Exception();
        if(!sub2.SequenceEqual([('A', 1), ('A', 2), ('C', 2)])) throw new Exception();
        if(!sub3.SequenceEqual([('A', 2)])) throw new Exception();
    }
}

public static class ObservableExtensions
{
    public static IConnectableObservable<T> ReplayWhere<T>(this IObservable<T> source, int buffer, Func<T, bool> predicate)
    {
        return source.Multicast(new ReplayWhereSubject<T>(1, predicate));
    }
}

public class ReplayWhereSubject<T> : ISubject<T>
{
    private readonly int _bufferSize;
    private readonly Func<T, bool> _predicate;
    private readonly ConcurrentQueue<T> _lastValues = new();

    private readonly Subject<T> _innerSub = new();

    public ReplayWhereSubject(int bufferSize, Func<T, bool> predicate)
    {
        _bufferSize = bufferSize;
        _predicate = predicate;
    }

    public void OnCompleted()
    {
        _innerSub.OnCompleted();
    }

    public void OnError(Exception error)
    {
        _innerSub.OnError(error);
    }

    public void OnNext(T value)
    {
        if (_predicate(value))
        {
            Enqueue(value);
        }

        _innerSub.OnNext(value);
    }

    private void Enqueue(T value)
    {
        _lastValues.Enqueue(value);
        if (_lastValues.Count > _bufferSize)
        {
            _lastValues.TryDequeue(out _);
        }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var lastValues = _lastValues.ToObservable();
        return lastValues.Concat(_innerSub).Subscribe(observer);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.