我正在尝试编写一个程序,通过将项目放入来自不同线程的集合中并在迭代集合并处置项目的单个线程中清理它们来安排删除项目。
在这样做之前,我想知道什么会产生最佳性能,所以我尝试了
ConcurrentBag
、ConcurrentStack
和 ConcurrentQueue
并测量了添加 10,000,000 个项目所需的时间。
我使用以下程序来测试这一点:
class Program
{
static List<int> list = new List<int>();
static ConcurrentBag<int> bag = new ConcurrentBag<int>();
static ConcurrentStack<int> stack = new ConcurrentStack<int>();
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
static void Main(string[] args)
{
run(addList);
run(addBag);
run(addStack);
run(addQueue);
Console.ReadLine();
}
private static void addList(int obj) { lock (list) { list.Add(obj); } }
private static void addStack(int obj) { stack.Push(obj); }
private static void addQueue(int obj) { queue.Enqueue(obj); }
private static void addBag(int obj) { bag.Add(obj); }
private static void run(Action<int> action)
{
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 10000000, new ParallelOptions() { MaxDegreeOfParallelism = # },
action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
}
其中 # 是使用的线程数。
但结果相当令人困惑:
有 8 个线程:
有 1 个线程:
因此,无论有多少线程,似乎只锁定一个普通的旧列表都比使用任何并发集合更快,除非队列需要处理大量写入。
编辑:在下面关于垃圾和调试构建的评论之后: 是的,这会影响基准。调试构建影响将是线性的,垃圾将随着内存使用量的增加而增加。
然而多次运行相同的测试会得到大致相同的结果。
我将收集的初始化移至测试运行之前,并在运行后收集垃圾,如下所示:
list = new List<int>();
run(addList);
list = null;
GC.Collect();
将
MaxDegreeOfParallelism
设置为 8,我得到以下结果:
每次运行代码时都会有 0.02 秒的偏差。
并发收集并不总是更快。他们中的大多数人只在较高级别的争用中看到性能提升,并且实际工作负载也会产生影响。查看 PFX 团队的这篇论文:
http://blogs.msdn.com/b/pfxteam/archive/2010/04/26/9997562.aspx
但要注意过早的优化。将有效的东西放在一起然后进行优化,特别是因为实际工作负载很重要。此外,将锁作为性能瓶颈的情况非常罕见,通常有一些 OP 或其他算法需要更长的时间!
不要忘记,您不必将项目添加到集合中,但也必须检索它们。因此,更公平的比较是基于 Monitor 的 Queue
然后我在我的机器上得到以下结果(我将迭代次数增加了 10 倍):
但有趣的不仅仅是表演。看看这两种方法:检查 Add/ConsumeQueue1 的正确性非常困难,而由于 BlockingCollection
static Queue<int> queue1 = new Queue<int>();
static BlockingCollection<int> queue2 = new BlockingCollection<int>();
static void Main(string[] args)
{
Run(AddQueue1, ConsumeQueue1);
Run(AddQueue2, ConsumeQueue2);
Console.ReadLine();
}
private static void AddQueue1(int obj)
{
lock (queue1)
{
queue1.Enqueue(obj);
if (queue1.Count == 1)
Monitor.Pulse(queue1);
}
}
private static void ConsumeQueue1()
{
lock (queue1)
{
while (true)
{
while (queue1.Count == 0)
Monitor.Wait(queue1);
var item = queue1.Dequeue();
// do something with item
}
}
}
private static void AddQueue2(int obj)
{
queue2.TryAdd(obj);
}
private static void ConsumeQueue2()
{
foreach (var item in queue2.GetConsumingEnumerable())
{
// do something with item
}
}
private static void Run(Action<int> action, ThreadStart consumer)
{
new Thread(consumer) { IsBackground = true }.Start();
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 100000000, new ParallelOptions() { MaxDegreeOfParallelism = 8 }, action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
我想看看添加和获取的性能比较。这是我使用的代码:
class Program
{
static List<int> list = new List<int>();
static ConcurrentBag<int> bag = new ConcurrentBag<int>();
static ConcurrentStack<int> stack = new ConcurrentStack<int>();
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
static void Main(string[] args)
{
list = new List<int>();
run(addList);
run(takeList);
list = null;
GC.Collect();
bag = new ConcurrentBag<int>();
run(addBag);
run(takeBag);
bag = null;
GC.Collect();
stack = new ConcurrentStack<int>();
run(addStack);
run(takeStack);
stack = null;
GC.Collect();
queue = new ConcurrentQueue<int>();
run(addQueue);
run(takeQueue);
queue = null;
GC.Collect();
Console.ReadLine();
}
private static void takeList(int obj)
{
lock (list)
{
if (list.Count == 0)
return;
int output = list[obj];
}
}
private static void takeStack(int obj)
{
stack.TryPop(out int output);
}
private static void takeQueue(int obj)
{
queue.TryDequeue(out int output);
}
private static void takeBag(int obj)
{
bag.TryTake(out int output);
}
private static void addList(int obj) { lock (list) { list.Add(obj); } }
private static void addStack(int obj) { stack.Push(obj); }
private static void addQueue(int obj) { queue.Enqueue(obj); }
private static void addBag(int obj) { bag.Add(obj); }
private static void run(Action<int> action)
{
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 10000000, new ParallelOptions()
{
MaxDegreeOfParallelism = 8
}, action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
}
输出为:
是的,但重点是您需要多个线程的并发性,在很长一段时间内以并发性运行它以查看平均性能,因为这没有考虑不同集合的锁定策略。