我正在使用SemaphoreSlim
with a FIFO behaviour,现在我想向其添加Starve(int amount)
方法以从池中删除线程,这与Release()
相反。
[如果有任何正在运行的任务,它们当然会一直持续到完成为止,因为此刻信号灯无法跟踪实际正在运行的内容,并且“欠”该信号灯一个释放调用。
原因是用户将随时动态控制给定信号量所允许的进程数。
我遵循的策略是:
CurrentCount > 0
,则在不释放回来的情况下在SemaphoreSlim上调用Await()
。Release()
时,请忽略它以防止线程被释放(一个int变量保持计数)我已经添加了到目前为止我在下面提供的代码。我正在努力解决的主要问题是如何确保线程安全,没有死锁以及没有令人惊讶的竞争条件。
鉴于我无法访问信号量的private lock(),所以我创建了一个新对象,至少试图防止多个线程同时(在包装器内)操纵新变量。
[但是,我担心SemaphoreSlim中的其他变量,例如CurrentCount
,也可能会改变一半,并使事情变得混乱...我希望Release()
方法中的锁定可以防止对CurrentCount
的更改,但是也许我也应该将锁应用于Wait和WaitAsync(这也可能会更改CurrentCount)?这也可能导致两次调用Wait(?)
在这种情况下,对semaphore.Wait()
的呼叫比await semaphore.WaitAsync()
好还是差?
是否有更好的方法来扩展诸如SemaphoreSlim之类的类的功能,该类包含许多可能需要或可能对访问有用的私有变量?
我简要地考虑过创建一个从SemaphoreSlim继承的新类,或者研究扩展方法,也许使用反射来访问私有变量,但是似乎没有一个明显或有效的。
public class SemaphoreQueue
{
private SemaphoreSlim semaphore;
private ConcurrentQueue<TaskCompletionSource<bool>> queue = new ConcurrentQueue<TaskCompletionSource<bool>>();
private int releasesToIgnore;
private object lockObj;
private const int NO_MAXIMUM = Int32.MaxValue; // cannot access SemaphoreSlim.NO_MAXIMUM
public SemaphoreQueue(int initialCount) : this(initialCount, NO_MAXIMUM) { }
public SemaphoreQueue(int initialCount, int maxCount)
{
semaphore = new SemaphoreSlim(initialCount, maxCount);
lockObj = new object();
releasesToIgnore = 0;
}
public void Starve(int amount)
{
lock (lockObj)
{
// a maximum of CurrentCount threads can be immediatelly starved by calling Wait without release
while ((semaphore.CurrentCount > 0) && (amount > 0))
{
semaphore.Wait();
amount -= 1;
}
// presumably there are still tasks running. The next Releases will be ignored.
if (amount > 0)
releasesToIgnore += amount;
}
}
public int Release()
{
return Release(1);
}
public int Release(int num)
{
lock (lockObj)
{
if (releasesToIgnore > num)
{
releasesToIgnore -= num;
return semaphore.CurrentCount;
}
else
{
int oldReleasesToIgnore = releasesToIgnore;
releasesToIgnore = 0;
return semaphore.Release(num - oldReleasesToIgnore);
}
}
}
public void Wait(CancellationToken token)
{
WaitAsync(token).Wait();
}
public Task WaitAsync(CancellationToken token)
{
var tcs = new TaskCompletionSource<bool>();
queue.Enqueue(tcs);
QueuedAwait(token);
return tcs.Task;
}
public int CurrentCount { get => this.semaphore.CurrentCount; }
private void QueuedAwait(CancellationToken token)
{
semaphore.WaitAsync(token).ContinueWith(t =>
{
TaskCompletionSource<bool> popped;
if (queue.TryDequeue(out popped))
popped.SetResult(true);
});
}
public void Dispose()
{
semaphore.Dispose();
}
}
我认为在SemaphoreSlim
类的顶部实现自定义信号量是有问题的,因为我们无法访问内置实现所使用的同步原语。因此,我建议仅使用SemaphoreSlim
对象队列来实现它。以下是缺少功能的基本实现。 TaskCompletionSource
方法缺少取消功能,WaitAsync
方法也缺少Release
参数。
为了简单起见,不使用releaseCount
计数器,而是允许现有的releasesToIgnore
具有负值。 currentCount
方法只会减少此计数器。
Starve