TL;博士:
在 C# 中实现此模式的有效方法是什么?
更详细一点:
作为一个心理示例,假设我们正在设计一个内存数据库。观察者需要能够等待他们所关心的任何行子集的更改。因此,对于这个问题,界面可能看起来很简单
interface ITable<TPrimaryKey, TRow>
{
Task WaitUntilAnyRowChanged(IReadOnlySet<TPrimaryKey> keysToListenTo, CancellationToken ct);
}
到目前为止,我想到的是:
想法 1:行上的 TaskCompletionSource
将
TaskCompletionSource
与每一行相关联。当值发生变化时,系统将完成与该行关联的现有 TaskCompletionSource
,并在下一次挂起的更新中将其替换为新的。然后,消费者(即 WaitUntilAnyRowChanged
)可以对所有相关的内容进行 Task.WhenAny(...)
。
缺点:
Task.WhenAny(...)
链接到数千个 TaskCompletionSource
-s 上效率非常低想法 2:观察者上的 AsyncAutoResetEvent (或类似的同步结构)
我们交换角色。每行都有一个
List<AsyncAutoResetEvent>
。每个观察者都会创建一个自己的 AsyncAutoResetEvent
,并将其注册到他们感兴趣的所有行。写入时,一行将为所有侦听器设置信号。
缺点:
想法 3:观察者聆听所有变化
观察者可以监听按下按键进行任何和所有更改的流。观察者将负责将其过滤到他们感兴趣的键子集,并可能设置本地
TaskCompletionSource
等。
专业人士:
缺点:
想法 4:投票
每行维护一个
Box<long>
,其装箱值包含该行当前版本的版本标记。
观察者定期轮询他们感兴趣的所有行,存储最后一个已知版本,并查看自上次轮询以来是否有任何版本更新。
专业人士:
缺点:
总的来说,我倾向于想法 4 作为迄今为止最可行的方法。但我对投票可能有一种非理性的仇恨..
所以我很想听听大家的想法 - 有人能想到更好的解决方案吗? 感觉这不是一个特别小众的问题。也许有一些我不知道的解决此类问题的标准方法?
这是一个有趣的问题。我的第一个想法是维护一个带有订阅的静态字典:
private static readonly Dictionary<TPrimaryKey, HashSet<Observer>> s_subscriptions;
Observer
源自TaskCompletionSource
:
class Observer : TaskCompletionSource
{
private readonly TPrimaryKey[] _keys;
public void Subscribe();
public void Complete();
}
观察者通过在
s_subscriptions
字典中添加它观察到的所有键来订阅:
public void Subscribe()
{
foreach (TPrimaryKey key in _keys)
{
s_subscriptions.GetOrAdd(key, () => new HashSet<Observer>()).Add(this);
}
}
当值发生变化时,您会转到字典并
Complete
所有正在观察该键的观察者:
if (_subscriptions.TryGetValue(key, out List<Observer> observers))
{
foreach (Observer observer in observers)
{
observer.Complete();
}
}
当观察者完成时,它将自身从
_subscriptions
中移除:
public void Complete()
{
foreach (TPrimaryKey key in _keys)
{
s_subscriptions[key].Remove(this);
}
base.SetResult(); // Completes the base.Task
}
您可能必须使用
Observer
选项实例化每个
TaskCreationOptions.RunContinuationsAsynchronously
,以便它在 ThreadPool
上完成,而不是在修改值的同一线程上完成。
为了最大限度地减少垃圾收集器的压力,您可以考虑使用
ValueTask
而不是 TaskCompletionSource
,并由可重用 IValueTaskSource
实现支持。但这可能会带来太多的工作而收效甚微。