使用Rx,如何在我的Subscribe方法运行时忽略all-except-the-latest值

问题描述 投票:33回答:9

使用Reactive Extensions,我想忽略来自我的Subscribe方法运行时发生的事件流的消息。即它有时需要我处理消息的时间比消息之间的时间长,因此我想删除没有时间处理的消息。

但是,当我的Subscribe方法完成时,如果有任何消息确实通过,我想处理最后一个。所以我总是处理最新消息。

所以,如果我有一些代码可以:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

如果我们假设'100'需要很长时间来处理。然后我希望在'100'完成时处理'2'。应该忽略'1',因为它仍然被'2'取代,而'100'仍在处理中。

这是我想要使用后台任务和Latest()的结果示例

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

但是,Latest()是一个阻塞调用,我宁愿不让一个线程等待下一个这样的值(消息之间有时会有很长的间隙)。

我也可以使用BroadcastBlockTPL Dataflow,得到我想要的结果,如下所示:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

但感觉它应该可以直接在Rx中使用。这是最好的方法吗?

c# system.reactive
9个回答
8
投票

这是一个类似于Dave的方法,但使用Sample代替(比缓冲区更合适)。我在Dave的回答中添加了类似的扩展方法。

扩展名:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

请注意,它更简单,并且没有触发的“空”缓冲区。发送给动作的第一个元素实际上来自流本身。

用法很简单:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

结果:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10

3
投票

这是使用“just”Rx的尝试。通过观察线程池,计时器和用户保持独立,我使用了一个主题来提供有关完成任务的反馈。

我不认为这是一个简单的解决方案,但我希望它可以给你改进的想法。

messages.
    Buffer(() => feedback).
    Select(l => l.LastOrDefault()).
    ObserveOn(Scheduler.ThreadPool).
    Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
        feedback.OnNext(Unit.Default);
    });

feedback.OnNext(Unit.Default);

有一个小问题 - 缓冲区在空时首先关闭,因此它会生成默认值。您可以通过在第一条消息之后进行反馈来解决它。


这是一个扩展功能:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var feedback = new Subject<Unit>();

    var sub = source.
        Buffer(() => feedback).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l.LastOrDefault());
            feedback.OnNext(Unit.Default);
        });

    feedback.OnNext(Unit.Default);

    return sub;
}

用法:

    messages.SubscribeWithoutOverlap(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(n);
    });

3
投票

感谢Lee Campbell(Intro To Rx成名),我现在有一个使用这种扩展方法的工作解决方案:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

3
投票

我用一个使用CAS而不是锁的解决方案写了一篇关于此的博客文章,避免了递归。代码如下,但您可以在这里找到完整的解释:http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        Notification<TSource> pendingNotification = null;
        var cancelable = new MultipleAssignmentDisposable();

        var sourceSubscription = source.Materialize()
            .Subscribe(notification =>
            {
                var previousNotification = Interlocked.Exchange(
                    ref pendingNotification, notification);

                if (previousNotification != null) return;

                cancelable.Disposable = scheduler.Schedule(() =>
                    {
                        var notificationToSend = Interlocked.Exchange(
                            ref pendingNotification, null);
                        notificationToSend.Accept(observer);
                    });
            });
            return new CompositeDisposable(sourceSubscription, cancelable);
    });
}

2
投票

使用Observable.Switch的示例。它还会在您完成任务时处理这种情况,但队列中没有任何内容。

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive
{
    public static class RXX
    {
        public static IDisposable SubscribeWithoutOverlap<T>
        ( this IObservable<T> source
        , Action<T> action
        , IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Publish();
            var connection = p.Connect();

            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });

            sampler.OnNext(Unit.Default);

            return new CompositeDisposable(connection, subscription);
        }
    }
}

2
投票

刚刚完成(并已完全修订)我自己的问题解决方案,我计划在生产中使用。

除非调度程序使用当前线程,否则应从源代码调用OnNextOnCompletedOnError;如果观察者忙于以前的通知,他们就会进入具有可指定最大大小的队列,每当处理完之前的通知时,他们就会被通知。如果队列填满,则丢弃最近的项目。因此,最大队列大小为0会忽略观察者忙时进入的所有项目;大小为1将始终让观察最新项目;大小高达int.MaxValue让消费者忙碌,直到赶上制片人。

如果调度程序支持长时间运行(即为您提供自己的线程),我会安排一个循环来通知观察者;否则我使用递归调度。

这是代码。任何评论都表示赞赏。

partial class MoreObservables
{
    /// <summary>
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
    /// </summary>
    /// <param name="source">The source sequence.</param>
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
    /// <remarks>
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
    /// </remarks>
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
        if (scheduler == null) scheduler = Scheduler.Default;

        return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
    }

    private static class LatestImpl<TSource>
    {
        public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));

            var longrunningScheduler = scheduler.AsLongRunning();
            if (longrunningScheduler != null)
                return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);

            return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
        }

        #region Subscriptions

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
        /// </summary>
        private sealed class LoopSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Head, // next notification is in _head
                Queue, // next notifications are in _queue, followed by _completion
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly IObserver<TSource> _observer;
            private State _state;
            private TSource _head; // item in front of the queue
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            {
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                scheduler.ScheduleLongRunning(_ => Loop());
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _head = value;
                            _state = State.Head;
                            Monitor.Pulse(_subscription);
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _state = State.Queue;
                            Monitor.Pulse(_subscription);
                            _subscription.Dispose();
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _head = default(TSource);
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    Monitor.Pulse(_subscription);
                    _subscription.Dispose();
                }
            }

            private void Loop()
            {
                try
                {
                    while (true) // overall loop for all notifications
                    {
                        // next notification to emit
                        Notification<TSource> completion;
                        TSource next; // iff completion == null

                        lock (_subscription)
                        {
                            while (true)
                            {
                                while (_state == State.Idle)
                                    Monitor.Wait(_subscription);

                                if (_state == State.Head)
                                {
                                    completion = null;
                                    next = _head;
                                    _head = default(TSource);
                                    _state = State.Queue;
                                    break;
                                }
                                if (_state == State.Queue)
                                {
                                    if (!_queue.IsEmpty)
                                    {
                                        completion = null;
                                        next = _queue.Dequeue(); // assumption: this never throws
                                        break;
                                    }
                                    if (_completion != null)
                                    {
                                        completion = _completion;
                                        next = default(TSource);
                                        break;
                                    }
                                    _state = State.Idle;
                                    continue;
                                }
                                Debug.Assert(_state == State.Disposed);
                                return;
                            }
                        }

                        if (completion != null)
                        {
                            completion.Accept(_observer);
                            return;
                        }
                        _observer.OnNext(next);
                    }
                }
                finally { Dispose(); }
            }
        }

        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
        /// </summary>
        private sealed class RecursiveSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Scheduled, // emitter scheduled or executing
                Disposed, // disposed
            }

            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
            private readonly IScheduler _scheduler;
            private readonly IObserver<TSource> _observer;
            private State _state;
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification

            public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
            {
                _scheduler = scheduler;
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }

            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _emitter.Disposable = _scheduler.Schedule(value, EmitNext);
                            _state = State.Scheduled;
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }

            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
                            _state = State.Scheduled;
                            _subscription.Dispose();
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }

            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;

                    _emitter.Dispose();
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    _subscription.Dispose();
                }
            }

            private void EmitNext(TSource value, Action<TSource> self)
            {
                try { _observer.OnNext(value); }
                catch { Dispose(); return; }

                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    Debug.Assert(_state == State.Scheduled);
                    if (!_queue.IsEmpty)
                        self(_queue.Dequeue());
                    else if (_completion != null)
                        _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
                    else
                        _state = State.Idle;
                }
            }

            private void EmitCompletion(Notification<TSource> completion)
            {
                try { completion.Accept(_observer); }
                finally { Dispose(); }
            }
        }

        #endregion

        #region IQueue

        /// <summary>
        /// FIFO queue that discards least recent items if size limit is reached.
        /// </summary>
        private interface IQueue
        {
            bool IsEmpty { get; }
            void Enqueue(TSource item);
            TSource Dequeue();
        }

        /// <summary>
        /// <see cref="IQueue"/> implementations.
        /// </summary>
        private static class Queue
        {
            public static IQueue Create(int maxSize)
            {
                switch (maxSize)
                {
                    case 0: return Zero.Instance;
                    case 1: return new One();
                    default: return new Many(maxSize);
                }
            }

            private sealed class Zero : IQueue
            {
                // ReSharper disable once StaticMemberInGenericType
                public static Zero Instance { get; } = new Zero();
                private Zero() { }

                public bool IsEmpty => true;
                public void Enqueue(TSource item) { }
                public TSource Dequeue() { throw new InvalidOperationException(); }
            }

            private sealed class One : IQueue
            {
                private TSource _item;

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    _item = item;
                    IsEmpty = false;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var item = _item;
                    _item = default(TSource);
                    IsEmpty = true;
                    return item;
                }
            }

            private sealed class Many : IQueue
            {
                private readonly int _maxSize, _initialSize;
                private int _deq, _enq; // indices of deque and enqueu positions
                private TSource[] _buffer;

                public Many(int maxSize)
                {
                    if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));

                    _maxSize = maxSize;
                    if (maxSize == int.MaxValue)
                        _initialSize = 4;
                    else
                    {
                        // choose an initial size that won't get us too close to maxSize when doubling
                        _initialSize = maxSize;
                        while (_initialSize >= 7)
                            _initialSize = (_initialSize + 1) / 2;
                    }
                }

                public bool IsEmpty { get; private set; } = true;

                public void Enqueue(TSource item)
                {
                    if (IsEmpty)
                    {
                        if (_buffer == null) _buffer = new TSource[_initialSize];
                        _buffer[0] = item;
                        _deq = 0;
                        _enq = 1;
                        IsEmpty = false;
                        return;
                    }
                    if (_deq == _enq) // full
                    {
                        if (_buffer.Length == _maxSize) // overwrite least recent
                        {
                            _buffer[_enq] = item;
                            if (++_enq == _buffer.Length) _enq = 0;
                            _deq = _enq;
                            return;
                        }

                        // increse buffer size
                        var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
                        var newBuffer = new TSource[newSize];
                        var count = _buffer.Length - _deq;
                        Array.Copy(_buffer, _deq, newBuffer, 0, count);
                        Array.Copy(_buffer, 0, newBuffer, count, _deq);
                        _deq = 0;
                        _enq = _buffer.Length;
                        _buffer = newBuffer;
                    }
                    _buffer[_enq] = item;
                    if (++_enq == _buffer.Length) _enq = 0;
                }

                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();

                    var result = ReadAndClear(ref _buffer[_deq]);
                    if (++_deq == _buffer.Length) _deq = 0;
                    if (_deq == _enq)
                    {
                        IsEmpty = true;
                        if (_buffer.Length > _initialSize) _buffer = null;
                    }
                    return result;
                }

                private static TSource ReadAndClear(ref TSource item)
                {
                    var result = item;
                    item = default(TSource);
                    return result;
                }
            }
        }

        #endregion
    }
}

1
投票

这是一个基于Task的实现,具有取消语义,不使用主题。如果需要,调用dispose允许订阅的操作取消处理。

    public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
    {
        var cancellation = new CancellationDisposable();
        var token = cancellation.Token;
        Task task = null;

        return new CompositeDisposable(
            cancellation,
            observable.Subscribe(value =>
            {
                if (task == null || task.IsCompleted)
                    task = Task.Factory.StartNew(() => action(value, token), token);
            })
        );
    }

这是一个简单的测试:

Observable.Interval(TimeSpan.FromMilliseconds(150))
                      .SampleSubscribe((v, ct) =>
                      {   
                          //cbeck for cancellation, do work
                          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
                              Thread.Sleep(100);

                          Console.WriteLine(v);
                      });

输出:

0
7
14
21
28
35

1
投票

使用Rx 2.0 RC,您可以使用Chunkify获取IEnumerable列表,每个列表包含自上次MoveNext以来观察到的内容。

然后,您可以使用ToObservable将其转换回IObservable,并且只关注每个非空列表中的最后一个条目。

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

messages.Chunkify()
        .ToObservable(Scheduler.TaskPool)
        .Where(list => list.Any())
        .Select(list => list.Last())
        .Subscribe(n =>
        {
          Thread.Sleep(TimeSpan.FromMilliseconds(250));
          Console.WriteLine(n);
        });

0
投票

又一个解决方案。

这并不漂亮,因为它混合了TaskObservable,所以使用ReactiveTest并不是真的可以测试(虽然说实话,我不确定如何用ReactiveTest实现'慢'用户)。

public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
    return Observable.Create<T>(observer =>
    {
        Task task = Task.FromResult(0);
        return source.Subscribe(t =>
        {
            if(task.IsCompleted)
                task = Task.Run(() => observer.OnNext(t));
            else
                Debug.WriteLine("Skip, task not finished");
        }, observer.OnError, observer.OnCompleted);
    });
}

我猜测那里可能存在竞争条件,但在我看来,如果我们处于抛弃事物的阶段,因为它太快了,我不介意放弃太多或太少。哦,每个OnNext被称为(可能)在不同的线程上(我想我可以在Synchronize背面放一个Create)。

我承认我无法让Materialize扩展工作正常(我把它连接到FromEventPattern(MouseMove)然后订阅了故意慢的订阅,奇怪的是它会让突发事件通过,而不是一次)

© www.soinside.com 2019 - 2024. All rights reserved.