Rx - 一次只允许 n 个并行操作的调度程序

问题描述 投票:0回答:1

我正在尝试编写一个允许有限并发的Scheduler。这是我写的。

public class ThreeAtATimeScheduler : IScheduler
{
    private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3);

public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
    var scheduler = ThreadPoolScheduler.Instance;
    return new CompositeDisposable(
        Disposable.Create(() => _semaphore.Release()),
        _semaphore.WaitAsync().
        ContinueWith(t => scheduler.Schedule(state, action))
    );
 }

...
}

这是我尝试使用它的方式(在我的用例中,我必须先积累一些消息,然后才能开始批量处理)。

var scheduler = new ThreeAtATimeScheduler();
var subscription = 
Observable.Range(0, 100)
    .Buffer(TimeSpan.FromSeconds(1), 10)
    .SelectMany(x => x)
    .Select(x => Observable.Start(() =>
    {
        Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
    }, scheduler))
    .Concat()
    .Subscribe();

我的程序显示以下输出并在几秒钟后结束。

Performing for Thread 3
Performing for Thread 5
Performing for Thread 8

我调试并发现没有任何信号量被释放。

如果有更好的方法来编写这个调度程序,我很想知道它,但我也很想知道为什么我的调度程序不起作用。

我的猜测是,必须在返回的 Disposable 上显式调用 Dispose,但是 Observable.Start() 实现会丢弃您的一次性对象并返回它自己的空对象,并且垃圾收集器从不调用 Dispose,因此 Release 永远不会发生。

c# system.reactive
1个回答
0
投票

对于我来说,你不需要这里的自定义调度程序。您可以纯粹通过 Rx linq 方法限制并发,如下所示:

    Observable.Range(0, 100)
        .Buffer(10)
        .Select(x => Observable.Start(() =>
        {
            Console.WriteLine($"Performing for Thread {Thread.CurrentThread.ManagedThreadId}");
            return x.Sum();
        }))
        .Merge(3) // <- degree of parallelism
        .Subscribe();

该示例显示了从输入缓冲区计算总和。

© www.soinside.com 2019 - 2024. All rights reserved.