我需要一个具有这些功能的队列:
为了避免锁定,我tried a
ConcurrentQueue
但发现竞争条件。所以我正在尝试一种自定义方法。
public interface IQueueItem
{
long Id { get; set; }
}
public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
public CircularBuffer(int capacity) => _capacity = capacity;
private readonly int _capacity;
private long _counter = 0;
private readonly object _lock = new();
public void Enqueue(T item)
{
lock (_lock) { // works but feels "heavy"
_counter++;
item.Id = _counter;
if (Count == _capacity) RemoveFirst();
AddLast(item);
}
}
}
并测试:
public class Item : IQueueItem
{
public long Id { get; set; }
//...
}
public class Program
{
public static void Main()
{
var q = new CircularBuffer<Item>(10);
Parallel.For(0, 15, i => q.Enqueue(new Item()));
Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
}
}
给出正确的输出(即使被竞争线程排队,也被排序,并且具有固定大小,最旧的项目出队):
6、7、8、9、10、11、12、13、14、15
实际上,我有读取(即枚举)该队列的 Web 请求。
问题:如果一个线程正在枚举队列,而另一个线程正在向队列添加内容,我就会出错。 (我可以在读取之前使用
ToList()
,但是对于一个大队列来说,它会占用所有服务器的内存,因为这可以通过多个请求每秒完成多次)。我该如何处理这种情况? 我使用的是链表,但我可以灵活地使用任何结构。
(此外,这似乎是一个非常重的锁部分;有没有更高效的方法?)
更新
正如下面的评论中所问:我预计队列有几百到几万个项目,但项目本身很小(只是一些原始数据类型)。我希望每秒排队。从网络请求中读取的频率较低,假设每分钟几次(但可以同时发生在服务器写入队列的情况下)。
根据您在问题中提供的指标,您有很多选择。
CircularBuffer<T>
的预期用途并不是那么重。包装一个 lock
-protected Queue<T>
应该工作得很好。在每次枚举时将队列内容复制到数组中(每秒复制 10,000 个元素几次)的成本不太可能引人注意。现代机器可以在眨眼之间完成这些事情。你必须每秒枚举集合数千次才能开始(稍微)成为一个问题。
ImmutableQueue<T>
类。它的一大优点是可以被多个线程并发自由枚举。您不必担心并发突变,因为这个集合是不可变的。它创建后没有人可以更改它,永远。
更新此集合的方式是创建一个新集合并丢弃前一个集合。这个集合有方法
Enqueue
和 Dequeue
,它们不会改变现有的集合,而是返回一个具有所需突变的新集合。这听起来效率极低,但实际上并非如此。新集合重用了现有集合的大部分内部部分。当然,与变异 Queue<T>
相比,它的成本要高得多,可能要贵 10 倍左右,但你希望你能得到更多的回报,因为枚举它是多么便宜和无争议。
public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
private readonly object _locker = new();
private readonly int _capacity;
private ImmutableQueue<T> _queue = ImmutableQueue<T>.Empty;
private int _count = 0;
private long _lastId = 0;
public ConcurrentCircularBuffer(int capacity) => _capacity = capacity;
public void Enqueue(T item)
{
lock (_locker)
{
item.Id = ++_lastId;
_queue = _queue.Enqueue(item);
if (_count < _capacity)
_count++;
else
_queue = _queue.Dequeue();
}
}
public IEnumerator<T> GetEnumerator()
=> ((IEnumerable<T>)Volatile.Read(ref _queue)).GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
实现
IQueueItem
接口的类应该这样实现:
public class QueueItem : IQueueItem
{
private long _id;
public long Id
{
get => Volatile.Read(ref _id);
set => Volatile.Write(ref _id, value);
}
}
否则,线程可能会看到一个
IQueueItem
未初始化 Id
的实例。有关解释,您可以阅读 Igor Ostrovsky 的this文章。我不是 100% 确定这是可能的,但我也不能保证这是不可能的。即使有了 Volatile
,将初始化 Id
的责任委托给外部组件对我来说仍然很脆弱。
由于ConcurrentQueue在本题中out,可以试试fixed array
IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;
排队很简单。
void Enqueue(IQueueItem item)
{
long id2 = Interlocked.Increment(ref id);
item.Id = id2 - 1;
items[id2 % SIZE] = item;
}
要输出数据,只需将数组复制到一个新数组,然后对其进行排序。 (当然这里可以优化)
var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);
由于并发插入,数组中可能会有一些空隙,可以取一个序列直到找到空隙。
var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);
这里是另一个实现,使用带锁的
Queue<T>
。
public interface IQueueItem
{
long Id { get; set; }
}
public class CircularBuffer<T> : IEnumerable<T> where T : class, IQueueItem
{
private readonly int _capacity;
private readonly Queue<T> _queue;
private long _lastId = 0;
private readonly object _lock = new();
public CircularBuffer(int capacity) {
_capacity = capacity;
_queue = new Queue<T>(capacity);
}
public void Enqueue(T item)
{
lock (_lock) {
if (_capacity < _queue.Count)
_queue.Dequeue();
item.Id = ++_lastId;
_queue.Enqueue(item);
}
}
public IEnumerator<T> GetEnumerator()
{
lock (_lock) {
var copy = _queue.ToArray();
return ((IEnumerable<T>)copy).GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
并测试:
public class Item : IQueueItem
{
private long _id;
public long Id
{
get => Volatile.Read(ref _id);
set => Volatile.Write(ref _id, value);
}
}
public class Program
{
public static void Main()
{
var q = new CircularBuffer<Item>(10);
Parallel.For(0, 15, i => q.Enqueue(new Item()));
Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
}
}
结果:
6、7、8、9、10、11、12、13、14、15