假设您有以下界面:
interface IConflateWorkByKey<TKey, TValue>
{
IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues();
void Publish(TKey key, TValue value);
}
这旨在描述类似工作队列的模式。生产者可以通过
Publish
对项目进行排队,单个消费者可以通过 GetValues
排出项目,从而:
Publish(1, "hello")
,然后调用 Publish(1, "world")
,那么消费者收到的下一个字典应该是 key: 1,value: “world”。先前的更新(1,“你好”)将被删除/永远不会被消费者看到Publish
的调用速度通常比工作消耗的速度要快得多。换句话说,对 Publish
的调用应该尽可能快,因此排出项目对性能来说并不是那么关键。GetValues
返回的迭代器时,新项目已经可用,不需要实际等待;对于这种情况进行快速路径优化会很有用。但是,需要准备一个实现来防止这种情况发生,然后异步等待新项目可用只会有一个消费者(即:GetValues
class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
private Dictionary<TKey,TValue>? _buffered = null;
private readonly object _lock = new();
public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
lock(_lock)
{
while(_buffered is null)
Monitor.Wait(_lock);
var result = _buffered;
_buffered = null;
yield return result;
}
}
}
public void Publish(TKey key, TValue value)
{
lock(_lock)
{
_buffered ??= new();
_buffered[key] = value;
Monitor.Pulse(_lock);
}
}
}
请注意,如果对于特定实现来说这是最佳的,我愿意更改
Publish
方法以返回
ValueTask
。原则上这是可行的,但主要问题是这里的GetValues
的实现不是异步的;调用线程在
Monitor.Wait
上被正确阻塞。我也用 AsyncMonitor
中的
Nito.AsyncEx
尝试过这种模式 - 但不幸的是,AsyncMonitor.Pulse
明显太慢了。 任何人都可以想到一种更聪明的实现/模式,它在发布价值方面速度极快,同时允许从内部实现真正的异步等待/信号发送
GetValues
?
编辑:这是另一个想法。我还没有考虑过这是否正确,也没有衡量性能,但在这里列出来供大家讨论。当然仍然对其他想法感到好奇!class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
private Dictionary<TKey,TValue>? _buffered = new();
private readonly object _lock = new();
private TaskCompletionSource? _tcs = null;
public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct)
{
while(!ct.IsCancellationRequested)
{
Dictionary<TKey,TValue> result;
while(true) {
lock(_lock)
{
if(_buffered.Any())
{
// "Fast path" - next result is already available, publish directly without having to wait
result = _buffered;
_buffered = new();
break;
}
_tcs = new();
}
await _tcs.Task;
}
yield return result;
}
}
public void Publish(TKey key, TValue value)
{
lock(_lock)
{
_buffered[key] = value;
if(_tcs is not null)
{
_tcs.TrySetResult();
_tcs = null; // "Fast path", next invocation of publish doesn't even need to call TrySetResult() if values weren't drained in between
}
}
}
}
来应对消费者尝试在字典为空时消费一批的情况:
class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue>
{
private readonly object _locker = new();
private TaskCompletionSource<Dictionary<TKey, TValue>> _tcs;
private Dictionary<TKey, TValue> _dictionary;
public void Publish(TKey key, TValue value)
{
lock (_locker)
{
if (_tcs is not null)
{
Debug.Assert(_dictionary is null);
_tcs.SetResult(new() { { key, value } });
_tcs = null;
}
else
{
_dictionary ??= new();
_dictionary[key] = value;
}
}
}
public async IAsyncEnumerable<Dictionary<TKey, TValue>> GetValues(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using CancellationTokenRegistration ctr = cancellationToken.Register(() =>
{
lock (_locker)
{
if (_tcs is not null)
{
_tcs.SetCanceled(cancellationToken);
_tcs = null;
}
}
});
while (true)
{
Dictionary<TKey, TValue> result = null;
Task<Dictionary<TKey, TValue>> taskResult = null;
lock (_locker)
{
if (_tcs is not null) throw new InvalidOperationException(
"Multiple consumers are not supported.");
cancellationToken.ThrowIfCancellationRequested();
if (_dictionary is not null)
{
result = _dictionary;
_dictionary = null;
}
else
{
_tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
taskResult = _tcs.Task;
}
}
if (result is not null)
yield return result;
else if (taskResult is not null)
yield return await taskResult.ConfigureAwait(false);
}
}
}
此实现支持多个并发生产者,最多一个消费者。
理想情况下,您需要一个像
PublishCompleted
这样的额外方法,以便生产者可以发出不再生产任何物品的信号,从而允许消费者在处理完所有批次后退出
await foreach
循环。 CancellationToken
对于这个角色来说并不理想,因为它会突然停止消费者,可能在消费最后一批之前。至少这是 CancellationToken
的预期行为。例如,比较类似 .NET 组件的 CompleteAdding
和
Complete
方法。