高性能融合生产者/消费者模式

问题描述 投票:0回答:1

假设您有以下界面:

interface IConflateWorkByKey<TKey, TValue>
{
  IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues();
  void Publish(TKey key, TValue value);
}

这旨在描述类似工作队列的模式。生产者可以通过

Publish
对项目进行排队,单个消费者可以通过
GetValues
排出项目,从而:

  • 值应按键合并。意思是,如果生产者在消费者有机会耗尽下一批(字典)之前调用
    Publish(1, "hello")
    ,然后调用
    Publish(1, "world")
    ,那么消费者收到的下一个字典应该是 key: 1,value: “world”。先前的更新(1,“你好”)将被删除/永远不会被消费者看到
  • Publish
    的调用速度通常比工作消耗的速度要快得多。换句话说,对
    Publish
    的调用应该尽可能快,因此排出项目对性能来说并不是那么关键。
  • 很可能在大多数实际情况下,当工作使用者继续从 GetValues 返回的迭代器时,新项目已经可用,不需要实际等待;对于这种情况进行快速路径优化会很有用。但是,需要准备一个实现来防止这种情况发生,然后异步等待新项目可用
    只会有一个消费者(即:
  • GetValues
  • 只会被1个消费者调用/消费)
    Publish 不会被并发调用(尽管它可能从不同的线程顺序调用)
  • 我当前的实现如下:

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue> { private Dictionary<TKey,TValue>? _buffered = null; private readonly object _lock = new(); public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct) { while(!ct.IsCancellationRequested) { lock(_lock) { while(_buffered is null) Monitor.Wait(_lock); var result = _buffered; _buffered = null; yield return result; } } } public void Publish(TKey key, TValue value) { lock(_lock) { _buffered ??= new(); _buffered[key] = value; Monitor.Pulse(_lock); } } }

请注意,如果对于特定实现来说这是最佳的,我愿意更改 
Publish

方法以返回

ValueTask
原则上这是可行的,但主要问题是这里的

GetValues

的实现不是异步的;调用线程在

Monitor.Wait
上被正确阻塞。
我也用 

AsyncMonitor

中的

Nito.AsyncEx
尝试过这种模式 - 但不幸的是,
AsyncMonitor.Pulse
明显
太慢了。 任何人都可以想到一种更聪明的实现/模式,它在发布价值方面速度极快,同时允许从内部实现真正的异步等待/信号发送

GetValues


编辑:这是另一个想法。我还没有考虑过这是否正确,也没有衡量性能,但在这里列出来供大家讨论。当然仍然对其他想法感到好奇!

class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue> { private Dictionary<TKey,TValue>? _buffered = new(); private readonly object _lock = new(); private TaskCompletionSource? _tcs = null; public IAsyncEnumerable<Dictionary<TKey,TValue>> GetValues(CancellationToken ct) { while(!ct.IsCancellationRequested) { Dictionary<TKey,TValue> result; while(true) { lock(_lock) { if(_buffered.Any()) { // "Fast path" - next result is already available, publish directly without having to wait result = _buffered; _buffered = new(); break; } _tcs = new(); } await _tcs.Task; } yield return result; } } public void Publish(TKey key, TValue value) { lock(_lock) { _buffered[key] = value; if(_tcs is not null) { _tcs.TrySetResult(); _tcs = null; // "Fast path", next invocation of publish doesn't even need to call TrySetResult() if values weren't drained in between } } } }


c# asynchronous concurrency producer-consumer iasyncenumerable
1个回答
0
投票

TaskCompletionSource<T>

 来应对消费者尝试在字典为空时消费一批的情况:
class Conflate<TKey, TValue> : IConflateWorkByKey<TKey, TValue> { private readonly object _locker = new(); private TaskCompletionSource<Dictionary<TKey, TValue>> _tcs; private Dictionary<TKey, TValue> _dictionary; public void Publish(TKey key, TValue value) { lock (_locker) { if (_tcs is not null) { Debug.Assert(_dictionary is null); _tcs.SetResult(new() { { key, value } }); _tcs = null; } else { _dictionary ??= new(); _dictionary[key] = value; } } } public async IAsyncEnumerable<Dictionary<TKey, TValue>> GetValues( [EnumeratorCancellation] CancellationToken cancellationToken = default) { using CancellationTokenRegistration ctr = cancellationToken.Register(() => { lock (_locker) { if (_tcs is not null) { _tcs.SetCanceled(cancellationToken); _tcs = null; } } }); while (true) { Dictionary<TKey, TValue> result = null; Task<Dictionary<TKey, TValue>> taskResult = null; lock (_locker) { if (_tcs is not null) throw new InvalidOperationException( "Multiple consumers are not supported."); cancellationToken.ThrowIfCancellationRequested(); if (_dictionary is not null) { result = _dictionary; _dictionary = null; } else { _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); taskResult = _tcs.Task; } } if (result is not null) yield return result; else if (taskResult is not null) yield return await taskResult.ConfigureAwait(false); } } }

此实现支持多个并发生产者,最多一个消费者。

理想情况下,您需要一个像

PublishCompleted

这样的额外方法,以便生产者可以发出不再生产任何物品的信号,从而允许消费者在处理完所有批次后退出

await foreach
循环。
CancellationToken
对于这个角色来说并不理想,因为它会突然停止消费者,可能在消费最后一批之前。至少这是
CancellationToken
的预期行为。例如,比较类似 .NET 组件的
CompleteAdding
Complete
 方法。

© www.soinside.com 2019 - 2024. All rights reserved.