我正在尝试编写一个允许有限并发的Scheduler。这是我写的。
public class ThreeAtATimeScheduler : IScheduler
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3);
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
var scheduler = ThreadPoolScheduler.Instance;
return new CompositeDisposable(
Disposable.Create(() => _semaphore.Release()),
_semaphore.WaitAsync().
ContinueWith(t => scheduler.Schedule(state, action))
);
}
...
}
这是我尝试使用它的方式(在我的用例中,我必须先积累一些消息,然后才能开始批量处理)。
var scheduler = new ThreeAtATimeScheduler();
var subscription =
Observable.Range(0, 100)
.Buffer(TimeSpan.FromSeconds(1), 10)
.SelectMany(x => x)
.Select(x => Observable.Start(() =>
{
Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
}, scheduler))
.Concat()
.Subscribe();
我的程序显示以下输出并在几秒钟后结束。
Performing for Thread 3
Performing for Thread 5
Performing for Thread 8
我调试并发现没有任何信号量被释放。
如果有更好的方法来编写这个调度程序,我很想知道它,但我也很想知道为什么我的调度程序不起作用。
我的猜测是,必须在返回的 Disposable 上显式调用 Dispose,但是 Observable.Start() 实现会丢弃您的一次性对象并返回它自己的空对象,并且垃圾收集器从不调用 Dispose,因此 Release 永远不会发生。
对于我来说,你不需要这里的自定义调度程序。您可以纯粹通过 Rx linq 方法限制并发,如下所示:
Observable.Range(0, 100)
.Buffer(10)
.Select(x => Observable.Start(() =>
{
Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
return x.Sum();
}))
.Merge(3) // <- degree of parallelism
.Subscribe();
该示例显示了从输入缓冲区计算总和。