我目前正在像这样从我的 MSMQ 中读取内容(为简洁起见,进行了简化):
public void Start()
{
this.queue.ReceiveCompleted += this.ReceiveCompleted;
this.queue.BeginReceive();
}
void ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
this.queue.EndReceive(e.AsyncResult);
try
{
var m = e.Message;
m.Formatter = this.formatter;
this.Handle(m.Body);
}
finally
{
this.queue.BeginReceive();
}
}
然而,这只允许我串行处理消息。如何修改此代码以允许并行消息处理?
我知道我可以将
this.queue.BeginReceive();
从 finally
移出并移至 ReceiveCompleted
的顶部,但是如何阻止它产生与我有消息一样多的线程呢?如何明智地控制并行级别,以免线程池泛滥?是否有一些内置的机制,或者我必须编写自己的管理器?
我的目标是更快地处理消息。消息的处理涉及对第三方的异步调用,因此目前我的实现在遍历队列上浪费了大量时间。
我认为托管更多队列读取器实例会更简单。然后,您可以根据需要通过部署/取消部署更多实例来快速扩展和缩减。
它也成为一种管理,而不是一个发展问题,这才是扩展应该有的样子。
您可以使用“生产者消费者模式”...
.NET 4 及更高版本具有
Concurrent
集合,它们是线程安全的,并且实现“大部分无锁”(因此在多线程中表现良好)...
BlockingCollection
与 TPL 结合来实现您想要的目标,而不必担心线程池饥饿或类似的情况...您只需将行 this.Handle(m.Body);
更改为类似 MyBlockingCollection.Add(m.Body);
的内容并启动“消费者线程”它在 MyBlockingCollection
上工作并执行实际工作(即,在 this.Handle
的下一个项目上调用 MyBlockingCollection
,例如通过调用 TryTake
获得)...请参阅上面的链接以获取基本示例...