具有序列 ID 的线程安全固定大小循环缓冲区

问题描述 投票:0回答:3

我需要一个具有这些功能的队列:

  • 固定大小(即循环缓冲区)
  • 队列项有 id(如主键),它们是顺序的
  • 线程安全(用于多个 ASP.NET Core 请求)

为了避免锁定,我tried a

ConcurrentQueue
但发现竞争条件。所以我正在尝试一种自定义方法。

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
    public CircularBuffer(int capacity) => _capacity = capacity;
    private readonly int _capacity;

    private long _counter = 0;
    private readonly object _lock = new();

    public void Enqueue(T item)
    {
        lock (_lock) {         // works but feels "heavy"
            _counter++;
            item.Id = _counter;
            if (Count == _capacity) RemoveFirst();
            AddLast(item);
        }
    }
}

并测试:

public class Item : IQueueItem
{
    public long Id { get; set; }
    //...
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

给出正确的输出(即使被竞争线程排队,也被排序,并且具有固定大小,最旧的项目出队):

6、7、8、9、10、11、12、13、14、15

实际上,我有读取(即枚举)该队列的 Web 请求。

问题:如果一个线程正在枚举队列,而另一个线程正在向队列添加内容,我就会出错。 (我可以在读取之前使用

ToList()
,但是对于一个大队列来说,它会占用所有服务器的内存,因为这可以通过多个请求每秒完成多次)。我该如何处理这种情况? 我使用的是链表,但我可以灵活地使用任何结构。

(此外,这似乎是一个非常重的锁部分;有没有更高效的方法?)

更新
正如下面的评论中所问:我预计队列有几百到几万个项目,但项目本身很小(只是一些原始数据类型)。我希望每秒排队。从网络请求中读取的频率较低,假设每分钟几次(但可以同时发生在服务器写入队列的情况下)。

c# collections concurrency queue thread-safety
3个回答
3
投票

根据您在问题中提供的指标,您有很多选择。

CircularBuffer<T>
的预期用途并不是那么重。包装一个
lock
-protected
Queue<T>
应该工作得很好。在每次枚举时将队列内容复制到数组中(每秒复制 10,000 个元素几次)的成本不太可能引人注意。现代机器可以在眨眼之间完成这些事情。你必须每秒枚举集合数千次才能开始(稍微)成为一个问题。

为了多样性,我将提出一个不同的结构作为内部存储:

ImmutableQueue<T>
类。它的一大优点是可以被多个线程并发自由枚举。您不必担心并发突变,因为这个集合是不可变的。它创建后没有人可以更改它,永远。

更新此集合的方式是创建一个新集合并丢弃前一个集合。这个集合有方法

Enqueue
Dequeue
,它们不会改变现有的集合,而是返回一个具有所需突变的新集合。这听起来效率极低,但实际上并非如此。新集合重用了现有集合的大部分内部部分。当然,与变异
Queue<T>
相比,它的成本要高得多,可能要贵 10 倍左右,但你希望你能得到更多的回报,因为枚举它是多么便宜和无争议。

public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
    private readonly object _locker = new();
    private readonly int _capacity;
    private ImmutableQueue<T> _queue = ImmutableQueue<T>.Empty;
    private int _count = 0;
    private long _lastId = 0;

    public ConcurrentCircularBuffer(int capacity) => _capacity = capacity;

    public void Enqueue(T item)
    {
        lock (_locker)
        {
            item.Id = ++_lastId;
            _queue = _queue.Enqueue(item);
            if (_count < _capacity)
                _count++;
            else
                _queue = _queue.Dequeue();
        }
    }

    public IEnumerator<T> GetEnumerator()
        => ((IEnumerable<T>)Volatile.Read(ref _queue)).GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

实现

IQueueItem
接口的类应该这样实现:

public class QueueItem : IQueueItem
{
    private long _id;

    public long Id
    {
        get => Volatile.Read(ref _id);
        set => Volatile.Write(ref _id, value);
    }
}

否则,线程可能会看到一个

IQueueItem
未初始化
Id
的实例。有关解释,您可以阅读 Igor Ostrovsky 的this文章。我不是 100% 确定这是可能的,但我也不能保证这是不可能的。即使有了
Volatile
,将初始化
Id
的责任委托给外部组件对我来说仍然很脆弱。


1
投票

由于ConcurrentQueue在本题中out,可以试试fixed array

IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;

排队很简单。

void Enqueue(IQueueItem item)
{
    long id2 = Interlocked.Increment(ref id);
    item.Id = id2 - 1;
    items[id2 % SIZE] = item;
}

要输出数据,只需将数组复制到一个新数组,然后对其进行排序。 (当然这里可以优化)

var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);

由于并发插入,数组中可能会有一些空隙,可以取一个序列直到找到空隙。

var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);

1
投票

这里是另一个实现,使用带锁的

Queue<T>

public interface IQueueItem
{
    long Id { get; set; }
}

public class CircularBuffer<T> : IEnumerable<T> where T : class, IQueueItem
{
    private readonly int _capacity;
    private readonly Queue<T> _queue;
    private long _lastId = 0;
    private readonly object _lock = new();

    public CircularBuffer(int capacity) {
        _capacity = capacity;
        _queue = new Queue<T>(capacity);
    }

    public void Enqueue(T item)
    {
        lock (_lock) {
            if (_capacity < _queue.Count)
                _queue.Dequeue();
            item.Id = ++_lastId;
            _queue.Enqueue(item);
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock) {
            var copy = _queue.ToArray();
            return ((IEnumerable<T>)copy).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

}

并测试:

public class Item : IQueueItem
{
    private long _id;

    public long Id
    {
        get => Volatile.Read(ref _id);
        set => Volatile.Write(ref _id, value);
    }
}

public class Program
{
    public static void Main()
    {
        var q = new CircularBuffer<Item>(10);
        Parallel.For(0, 15, i => q.Enqueue(new Item()));
        Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
    }
}

结果:

6、7、8、9、10、11、12、13、14、15

© www.soinside.com 2019 - 2024. All rights reserved.