使用Reactive Extensions,我想忽略来自我的Subscribe
方法运行时发生的事件流的消息。即它有时需要我处理消息的时间比消息之间的时间长,因此我想删除没有时间处理的消息。
但是,当我的Subscribe
方法完成时,如果有任何消息确实通过,我想处理最后一个。所以我总是处理最新消息。
所以,如果我有一些代码可以:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
如果我们假设'100'需要很长时间来处理。然后我希望在'100'完成时处理'2'。应该忽略'1',因为它仍然被'2'取代,而'100'仍在处理中。
这是我想要使用后台任务和Latest()
的结果示例
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
但是,Latest()是一个阻塞调用,我宁愿不让一个线程等待下一个这样的值(消息之间有时会有很长的间隙)。
我也可以使用BroadcastBlock
的TPL Dataflow,得到我想要的结果,如下所示:
var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
但感觉它应该可以直接在Rx中使用。这是最好的方法吗?
这是一个类似于Dave的方法,但使用Sample
代替(比缓冲区更合适)。我在Dave的回答中添加了类似的扩展方法。
扩展名:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var sampler = new Subject<Unit>();
var sub = source.
Sample(sampler).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
// start sampling when we have a first value
source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
return sub;
}
请注意,它更简单,并且没有触发的“空”缓冲区。发送给动作的第一个元素实际上来自流本身。
用法很简单:
messages.SubscribeWithoutOverlap(n =>
{
Console.WriteLine("start: " + n);
Thread.Sleep(500);
Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing
结果:
source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
这是使用“just”Rx的尝试。通过观察线程池,计时器和用户保持独立,我使用了一个主题来提供有关完成任务的反馈。
我不认为这是一个简单的解决方案,但我希望它可以给你改进的想法。
messages.
Buffer(() => feedback).
Select(l => l.LastOrDefault()).
ObserveOn(Scheduler.ThreadPool).
Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
feedback.OnNext(Unit.Default);
});
feedback.OnNext(Unit.Default);
有一个小问题 - 缓冲区在空时首先关闭,因此它会生成默认值。您可以通过在第一条消息之后进行反馈来解决它。
这是一个扩展功能:
public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
var feedback = new Subject<Unit>();
var sub = source.
Buffer(() => feedback).
ObserveOn(Scheduler.ThreadPool).
Subscribe(l =>
{
action(l.LastOrDefault());
feedback.OnNext(Unit.Default);
});
feedback.OnNext(Unit.Default);
return sub;
}
用法:
messages.SubscribeWithoutOverlap(n =>
{
Thread.Sleep(1000);
Console.WriteLine(n);
});
感谢Lee Campbell(Intro To Rx成名),我现在有一个使用这种扩展方法的工作解决方案:
public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool alreadyActive;
lock (gate)
{
alreadyActive = active;
active = true;
outsideNotification = thisNotification;
}
if (!alreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
我用一个使用CAS而不是锁的解决方案写了一篇关于此的博客文章,避免了递归。代码如下,但您可以在这里找到完整的解释:http://www.zerobugbuild.com/?p=192
public static IObservable<TSource> ObserveLatestOn<TSource>(
this IObservable<TSource> source,
IScheduler scheduler)
{
return Observable.Create<TSource>(observer =>
{
Notification<TSource> pendingNotification = null;
var cancelable = new MultipleAssignmentDisposable();
var sourceSubscription = source.Materialize()
.Subscribe(notification =>
{
var previousNotification = Interlocked.Exchange(
ref pendingNotification, notification);
if (previousNotification != null) return;
cancelable.Disposable = scheduler.Schedule(() =>
{
var notificationToSend = Interlocked.Exchange(
ref pendingNotification, null);
notificationToSend.Accept(observer);
});
});
return new CompositeDisposable(sourceSubscription, cancelable);
});
}
使用Observable.Switch的示例。它还会在您完成任务时处理这种情况,但队列中没有任何内容。
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive
{
public static class RXX
{
public static IDisposable SubscribeWithoutOverlap<T>
( this IObservable<T> source
, Action<T> action
, IScheduler scheduler = null)
{
var sampler = new Subject<Unit>();
scheduler = scheduler ?? Scheduler.Default;
var p = source.Publish();
var connection = p.Connect();
var subscription = sampler.Select(x=>p.Take(1))
.Switch()
.ObserveOn(scheduler)
.Subscribe(l =>
{
action(l);
sampler.OnNext(Unit.Default);
});
sampler.OnNext(Unit.Default);
return new CompositeDisposable(connection, subscription);
}
}
}
刚刚完成(并已完全修订)我自己的问题解决方案,我计划在生产中使用。
除非调度程序使用当前线程,否则应从源代码调用OnNext
,OnCompleted
,OnError
;如果观察者忙于以前的通知,他们就会进入具有可指定最大大小的队列,每当处理完之前的通知时,他们就会被通知。如果队列填满,则丢弃最近的项目。因此,最大队列大小为0会忽略观察者忙时进入的所有项目;大小为1将始终让观察最新项目;大小高达int.MaxValue
让消费者忙碌,直到赶上制片人。
如果调度程序支持长时间运行(即为您提供自己的线程),我会安排一个循环来通知观察者;否则我使用递归调度。
这是代码。任何评论都表示赞赏。
partial class MoreObservables
{
/// <summary>
/// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
/// </summary>
/// <param name="source">The source sequence.</param>
/// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
/// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
/// <remarks>
/// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
/// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
/// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
/// </remarks>
public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
if (scheduler == null) scheduler = Scheduler.Default;
return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
}
private static class LatestImpl<TSource>
{
public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
var longrunningScheduler = scheduler.AsLongRunning();
if (longrunningScheduler != null)
return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);
return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
}
#region Subscriptions
/// <summary>
/// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
/// </summary>
private sealed class LoopSubscription : IDisposable
{
private enum State
{
Idle, // nothing to notify
Head, // next notification is in _head
Queue, // next notifications are in _queue, followed by _completion
Disposed, // disposed
}
private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
private readonly IObserver<TSource> _observer;
private State _state;
private TSource _head; // item in front of the queue
private IQueue _queue; // queued items
private Notification<TSource> _completion; // completion notification
public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
{
_observer = observer;
_queue = Queue.Create(maxQueueSize);
scheduler.ScheduleLongRunning(_ => Loop());
_subscription.Disposable = source.Subscribe(
OnNext,
error => OnCompletion(Notification.CreateOnError<TSource>(error)),
() => OnCompletion(Notification.CreateOnCompleted<TSource>()));
}
private void OnNext(TSource value)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_head = value;
_state = State.Head;
Monitor.Pulse(_subscription);
break;
case State.Head:
case State.Queue:
if (_completion != null) return;
try { _queue.Enqueue(value); }
catch (Exception error) // probably OutOfMemoryException
{
_completion = Notification.CreateOnError<TSource>(error);
_subscription.Dispose();
}
break;
}
}
}
private void OnCompletion(Notification<TSource> completion)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_completion = completion;
_state = State.Queue;
Monitor.Pulse(_subscription);
_subscription.Dispose();
break;
case State.Head:
case State.Queue:
if (_completion != null) return;
_completion = completion;
_subscription.Dispose();
break;
}
}
}
public void Dispose()
{
lock (_subscription)
{
if (_state == State.Disposed) return;
_head = default(TSource);
_queue = null;
_completion = null;
_state = State.Disposed;
Monitor.Pulse(_subscription);
_subscription.Dispose();
}
}
private void Loop()
{
try
{
while (true) // overall loop for all notifications
{
// next notification to emit
Notification<TSource> completion;
TSource next; // iff completion == null
lock (_subscription)
{
while (true)
{
while (_state == State.Idle)
Monitor.Wait(_subscription);
if (_state == State.Head)
{
completion = null;
next = _head;
_head = default(TSource);
_state = State.Queue;
break;
}
if (_state == State.Queue)
{
if (!_queue.IsEmpty)
{
completion = null;
next = _queue.Dequeue(); // assumption: this never throws
break;
}
if (_completion != null)
{
completion = _completion;
next = default(TSource);
break;
}
_state = State.Idle;
continue;
}
Debug.Assert(_state == State.Disposed);
return;
}
}
if (completion != null)
{
completion.Accept(_observer);
return;
}
_observer.OnNext(next);
}
}
finally { Dispose(); }
}
}
/// <summary>
/// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
/// </summary>
private sealed class RecursiveSubscription : IDisposable
{
private enum State
{
Idle, // nothing to notify
Scheduled, // emitter scheduled or executing
Disposed, // disposed
}
private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
private readonly IScheduler _scheduler;
private readonly IObserver<TSource> _observer;
private State _state;
private IQueue _queue; // queued items
private Notification<TSource> _completion; // completion notification
public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
{
_scheduler = scheduler;
_observer = observer;
_queue = Queue.Create(maxQueueSize);
_subscription.Disposable = source.Subscribe(
OnNext,
error => OnCompletion(Notification.CreateOnError<TSource>(error)),
() => OnCompletion(Notification.CreateOnCompleted<TSource>()));
}
private void OnNext(TSource value)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_emitter.Disposable = _scheduler.Schedule(value, EmitNext);
_state = State.Scheduled;
break;
case State.Scheduled:
if (_completion != null) return;
try { _queue.Enqueue(value); }
catch (Exception error) // probably OutOfMemoryException
{
_completion = Notification.CreateOnError<TSource>(error);
_subscription.Dispose();
}
break;
}
}
}
private void OnCompletion(Notification<TSource> completion)
{
lock (_subscription)
{
switch (_state)
{
case State.Idle:
_completion = completion;
_emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
_state = State.Scheduled;
_subscription.Dispose();
break;
case State.Scheduled:
if (_completion != null) return;
_completion = completion;
_subscription.Dispose();
break;
}
}
}
public void Dispose()
{
lock (_subscription)
{
if (_state == State.Disposed) return;
_emitter.Dispose();
_queue = null;
_completion = null;
_state = State.Disposed;
_subscription.Dispose();
}
}
private void EmitNext(TSource value, Action<TSource> self)
{
try { _observer.OnNext(value); }
catch { Dispose(); return; }
lock (_subscription)
{
if (_state == State.Disposed) return;
Debug.Assert(_state == State.Scheduled);
if (!_queue.IsEmpty)
self(_queue.Dequeue());
else if (_completion != null)
_emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
else
_state = State.Idle;
}
}
private void EmitCompletion(Notification<TSource> completion)
{
try { completion.Accept(_observer); }
finally { Dispose(); }
}
}
#endregion
#region IQueue
/// <summary>
/// FIFO queue that discards least recent items if size limit is reached.
/// </summary>
private interface IQueue
{
bool IsEmpty { get; }
void Enqueue(TSource item);
TSource Dequeue();
}
/// <summary>
/// <see cref="IQueue"/> implementations.
/// </summary>
private static class Queue
{
public static IQueue Create(int maxSize)
{
switch (maxSize)
{
case 0: return Zero.Instance;
case 1: return new One();
default: return new Many(maxSize);
}
}
private sealed class Zero : IQueue
{
// ReSharper disable once StaticMemberInGenericType
public static Zero Instance { get; } = new Zero();
private Zero() { }
public bool IsEmpty => true;
public void Enqueue(TSource item) { }
public TSource Dequeue() { throw new InvalidOperationException(); }
}
private sealed class One : IQueue
{
private TSource _item;
public bool IsEmpty { get; private set; } = true;
public void Enqueue(TSource item)
{
_item = item;
IsEmpty = false;
}
public TSource Dequeue()
{
if (IsEmpty) throw new InvalidOperationException();
var item = _item;
_item = default(TSource);
IsEmpty = true;
return item;
}
}
private sealed class Many : IQueue
{
private readonly int _maxSize, _initialSize;
private int _deq, _enq; // indices of deque and enqueu positions
private TSource[] _buffer;
public Many(int maxSize)
{
if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));
_maxSize = maxSize;
if (maxSize == int.MaxValue)
_initialSize = 4;
else
{
// choose an initial size that won't get us too close to maxSize when doubling
_initialSize = maxSize;
while (_initialSize >= 7)
_initialSize = (_initialSize + 1) / 2;
}
}
public bool IsEmpty { get; private set; } = true;
public void Enqueue(TSource item)
{
if (IsEmpty)
{
if (_buffer == null) _buffer = new TSource[_initialSize];
_buffer[0] = item;
_deq = 0;
_enq = 1;
IsEmpty = false;
return;
}
if (_deq == _enq) // full
{
if (_buffer.Length == _maxSize) // overwrite least recent
{
_buffer[_enq] = item;
if (++_enq == _buffer.Length) _enq = 0;
_deq = _enq;
return;
}
// increse buffer size
var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
var newBuffer = new TSource[newSize];
var count = _buffer.Length - _deq;
Array.Copy(_buffer, _deq, newBuffer, 0, count);
Array.Copy(_buffer, 0, newBuffer, count, _deq);
_deq = 0;
_enq = _buffer.Length;
_buffer = newBuffer;
}
_buffer[_enq] = item;
if (++_enq == _buffer.Length) _enq = 0;
}
public TSource Dequeue()
{
if (IsEmpty) throw new InvalidOperationException();
var result = ReadAndClear(ref _buffer[_deq]);
if (++_deq == _buffer.Length) _deq = 0;
if (_deq == _enq)
{
IsEmpty = true;
if (_buffer.Length > _initialSize) _buffer = null;
}
return result;
}
private static TSource ReadAndClear(ref TSource item)
{
var result = item;
item = default(TSource);
return result;
}
}
}
#endregion
}
}
这是一个基于Task
的实现,具有取消语义,不使用主题。如果需要,调用dispose允许订阅的操作取消处理。
public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
{
var cancellation = new CancellationDisposable();
var token = cancellation.Token;
Task task = null;
return new CompositeDisposable(
cancellation,
observable.Subscribe(value =>
{
if (task == null || task.IsCompleted)
task = Task.Factory.StartNew(() => action(value, token), token);
})
);
}
这是一个简单的测试:
Observable.Interval(TimeSpan.FromMilliseconds(150))
.SampleSubscribe((v, ct) =>
{
//cbeck for cancellation, do work
for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
Thread.Sleep(100);
Console.WriteLine(v);
});
输出:
0
7
14
21
28
35
使用Rx 2.0 RC,您可以使用Chunkify
获取IEnumerable列表,每个列表包含自上次MoveNext以来观察到的内容。
然后,您可以使用ToObservable
将其转换回IObservable,并且只关注每个非空列表中的最后一个条目。
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
messages.Chunkify()
.ToObservable(Scheduler.TaskPool)
.Where(list => list.Any())
.Select(list => list.Last())
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
又一个解决方案。
这并不漂亮,因为它混合了Task
和Observable
,所以使用ReactiveTest
并不是真的可以测试(虽然说实话,我不确定如何用ReactiveTest
实现'慢'用户)。
public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
return Observable.Create<T>(observer =>
{
Task task = Task.FromResult(0);
return source.Subscribe(t =>
{
if(task.IsCompleted)
task = Task.Run(() => observer.OnNext(t));
else
Debug.WriteLine("Skip, task not finished");
}, observer.OnError, observer.OnCompleted);
});
}
我猜测那里可能存在竞争条件,但在我看来,如果我们处于抛弃事物的阶段,因为它太快了,我不介意放弃太多或太少。哦,每个OnNext
被称为(可能)在不同的线程上(我想我可以在Synchronize
背面放一个Create
)。
我承认我无法让Materialize扩展工作正常(我把它连接到FromEventPattern(MouseMove)
然后订阅了故意慢的订阅,奇怪的是它会让突发事件通过,而不是一次)