异步等待对大型内存表中的行子集的更改

问题描述 投票:0回答:1

TL;博士:

  • 我有一个内存数据库,维护大量(10-5000 万)行
  • 我需要一种有效的方法让观察者开始监听其中的任何子集(通常大约 10,000 个项目),并异步等待,直到其中任何一个发生更改
  • 系统需要准备好至少有 50,000 个这样的观察者同时活跃
  • 假设每秒将写入/更新约 1,000 - 10,000 行

在 C# 中实现此模式的有效方法是什么?


更详细一点:

作为一个心理示例,假设我们正在设计一个内存数据库。观察者需要能够等待他们所关心的任何行子集的更改。因此,对于这个问题,界面可能看起来很简单

interface ITable<TPrimaryKey, TRow>
{
  Task WaitUntilAnyRowChanged(IReadOnlySet<TPrimaryKey> keysToListenTo, CancellationToken ct);
}

到目前为止,我想到的是:

想法 1:行上的 TaskCompletionSource

TaskCompletionSource
与每一行相关联。当值发生变化时,系统将完成与该行关联的现有
TaskCompletionSource
,并在下一次挂起的更新中将其替换为新的。然后,消费者(即
WaitUntilAnyRowChanged
)可以对所有相关的内容进行
Task.WhenAny(...)

缺点:

  1. 我预计将
    Task.WhenAny(...)
    链接到数千个
    TaskCompletionSource
    -s 上效率非常低
  2. 不是最大的问题,但需要相对大量的分配

想法 2:观察者上的 AsyncAutoResetEvent (或类似的同步结构)

我们交换角色。每行都有一个

List<AsyncAutoResetEvent>
。每个观察者都会创建一个自己的
AsyncAutoResetEvent
,并将其注册到他们感兴趣的所有行。写入时,一行将为所有侦听器设置信号。

缺点:

  1. 这有可能显着降低数据库的写入速度;本来可以是简单的字典写入现在变成了写入+循环潜在的数千个观察者,触发他们的AsyncAutoResentEvent

想法 3:观察者聆听所有变化

观察者可以监听按下按键进行任何和所有更改的流。观察者将负责将其过滤到他们感兴趣的键子集,并可能设置本地

TaskCompletionSource
等。

专业人士:

  1. 实施简单
  2. 相对可预测的性能(可以专用 x 个线程来将这些事件转发给观察者)

缺点:

  1. 如果观察者只关心一小部分行,则效率相当低。想象一下只监听 5000 万行表中的 1 行。

想法 4:投票

每行维护一个

Box<long>
,其装箱值包含该行当前版本的版本标记。 观察者定期轮询他们感兴趣的所有行,存储最后一个已知版本,并查看自上次轮询以来是否有任何版本更新。

专业人士:

  1. 简单
  2. 感觉可以很好地扩展:写入速度不受影响。对于观察者来说,循环遍历约 10k 盒装长整数来读取和比较它们的值应该相当快。
  3. 几乎无需GC

缺点:

  1. 没有人喜欢投票!更改检测会延迟轮询间隔,而不是总是立即拾取
  2. 随着观察到的键数量的增加,可能无法很好地扩展

总的来说,我倾向于想法 4 作为迄今为止最可行的方法。但我对投票可能有一种非理性的仇恨..

所以我很想听听大家的想法 - 有人能想到更好的解决方案吗? 感觉这不是一个特别小众的问题。也许有一些我不知道的解决此类问题的标准方法?

c# asynchronous events synchronization
1个回答
0
投票

这是一个有趣的问题。我的第一个想法是维护一个带有订阅的静态字典:

private static readonly Dictionary<TPrimaryKey, HashSet<Observer>> s_subscriptions;

Observer
源自
TaskCompletionSource

class Observer : TaskCompletionSource
{
    private readonly TPrimaryKey[] _keys;

    public void Subscribe();
    public void Complete();
}

观察者通过在

s_subscriptions
字典中添加它观察到的所有键来订阅:

    public void Subscribe()
    {
        foreach (TPrimaryKey key in _keys)
        {
            s_subscriptions.GetOrAdd(key, () => new HashSet<Observer>()).Add(this);
        }
    }

当值发生变化时,您会转到字典并

Complete
所有正在观察该键的观察者:

if (_subscriptions.TryGetValue(key, out List<Observer> observers))
{
    foreach (Observer observer in observers)
    {
        observer.Complete();
    }
}

当观察者完成时,它将自身从

_subscriptions
中移除:

    public void Complete()
    {
        foreach (TPrimaryKey key in _keys)
        {
            s_subscriptions[key].Remove(this);
        }
        base.SetResult(); // Completes the base.Task
    }

您可能必须使用

Observer
 选项实例化每个 
TaskCreationOptions.​RunContinuationsAsynchronously
,以便它在
ThreadPool
上完成,而不是在修改值的同一线程上完成。

为了最大限度地减少垃圾收集器的压力,您可以考虑使用

ValueTask
而不是
TaskCompletionSource
,并由可重用
IValueTaskSource
实现支持。但这可能会带来太多的工作而收效甚微。

© www.soinside.com 2019 - 2024. All rights reserved.