为SemaphoreSlim执行Starve方法(“ Unrelease” /“ Hold”)

问题描述 投票:1回答:1

我正在使用SemaphoreSlim with a FIFO behaviour,现在我想向其添加Starve(int amount)方法以从池中删除线程,这与Release()相反。

[如果有任何正在运行的任务,它们当然会一直持续到完成为止,因为此刻信号灯无法跟踪实际正在运行的内容,并且“欠”​​该信号灯一个释放调用。

原因是用户将随时动态控制给定信号量所允许的进程数。

我遵循的策略是:

  • [如果有可用线程,即CurrentCount > 0,则在不释放回来的情况下在SemaphoreSlim上调用Await()
  • 如果没有更多可用线程,因为可能正在运行任务,甚至可能排队,那么下次调用Release()时,请忽略它以防止线程被释放(一个int变量保持计数)

我已经添加了到目前为止我在下面提供的代码。我正在努力解决的主要问题是如何确保线程安全,没有死锁以及没有令人惊讶的竞争条件。

鉴于我无法访问信号量的private lock(),所以我创建了一个新对象,至少试图防止多个线程同时(在包装器内)操纵新变量。

[但是,我担心SemaphoreSlim中的其他变量,例如CurrentCount,也可能会改变一半,并使事情变得混乱...我希望Release()方法中的锁定可以防止对CurrentCount的更改,但是也许我也应该将锁应用于Wait和WaitAsync(这也可能会更改CurrentCount)?这也可能导致两次调用Wait(?)

之间不必要的锁定

在这种情况下,对semaphore.Wait()的呼叫比await semaphore.WaitAsync()好还是差?

是否有更好的方法来扩展诸如SemaphoreSlim之类的类的功能,该类包含许多可能需要或可能对访问有用的私有变量?

我简要地考虑过创建一个从SemaphoreSlim继承的新类,或者研究扩展方法,也许使用反射来访问私有变量,但是似乎没有一个明显或有效的。

    public class SemaphoreQueue
    {
        private SemaphoreSlim semaphore;
        private ConcurrentQueue<TaskCompletionSource<bool>> queue = new ConcurrentQueue<TaskCompletionSource<bool>>();

        private int releasesToIgnore;
        private object lockObj;
        private const int NO_MAXIMUM = Int32.MaxValue; // cannot access SemaphoreSlim.NO_MAXIMUM
        public SemaphoreQueue(int initialCount) : this(initialCount, NO_MAXIMUM) { }
        public SemaphoreQueue(int initialCount, int maxCount)
        {
            semaphore = new SemaphoreSlim(initialCount, maxCount);
            lockObj = new object();
            releasesToIgnore = 0;
        }
        public void Starve(int amount)
        {
            lock (lockObj)
            {
                // a maximum of CurrentCount threads can be immediatelly starved by calling Wait without release
                while ((semaphore.CurrentCount > 0) && (amount > 0))
                {
                    semaphore.Wait();
                    amount -= 1;
                }
                // presumably there are still tasks running. The next Releases will be ignored.
                if (amount > 0)
                    releasesToIgnore += amount;
            }
        }
        public int Release()
        {
            return Release(1);
        }
        public int Release(int num)
        {
            lock (lockObj)
            {
                if (releasesToIgnore > num)
                {
                    releasesToIgnore -= num;
                    return semaphore.CurrentCount;
                }
                else
                {
                    int oldReleasesToIgnore = releasesToIgnore;
                    releasesToIgnore = 0;
                    return semaphore.Release(num - oldReleasesToIgnore);
                }
            }
        }
        public void Wait(CancellationToken token)
        {
            WaitAsync(token).Wait();
        }
        public Task WaitAsync(CancellationToken token)
        {
            var tcs = new TaskCompletionSource<bool>();
            queue.Enqueue(tcs);
            QueuedAwait(token);
            return tcs.Task;
        }
        public int CurrentCount { get => this.semaphore.CurrentCount; }
        private void QueuedAwait(CancellationToken token)
        {
            semaphore.WaitAsync(token).ContinueWith(t =>
            {
                TaskCompletionSource<bool> popped;
                if (queue.TryDequeue(out popped))
                    popped.SetResult(true);
            });
        }

        public void Dispose()
        {
            semaphore.Dispose();
        }
    }
c# multithreading concurrency locking race-condition
1个回答
0
投票

我认为在SemaphoreSlim类的顶部实现自定义信号量是有问题的,因为我们无法访问内置实现所使用的同步原语。因此,我建议仅使用SemaphoreSlim对象队列来实现它。以下是缺少功能的基本实现。 TaskCompletionSource方法缺少取消功能,WaitAsync方法也缺少Release参数。

为了简单起见,不使用releaseCount计数器,而是允许现有的releasesToIgnore具有负值。 currentCount方法只会减少此计数器。

Starve
© www.soinside.com 2019 - 2024. All rights reserved.