多线程MSMQ监听

问题描述 投票:0回答:2

我目前正在像这样从我的 MSMQ 中读取内容(为简洁起见,进行了简化):

public void Start()
{
    this.queue.ReceiveCompleted += this.ReceiveCompleted;
    this.queue.BeginReceive();
}

void ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
    this.queue.EndReceive(e.AsyncResult);
            
    try
    {
        var m = e.Message;
        m.Formatter = this.formatter;

        this.Handle(m.Body);
    }
    finally
    {
        this.queue.BeginReceive();
    }
}

然而,这只允许我串行处理消息。如何修改此代码以允许并行消息处理?

我知道我可以将

this.queue.BeginReceive();
finally
移出并移至
ReceiveCompleted
的顶部,但是如何阻止它产生与我有消息一样多的线程呢?如何明智地控制并行级别,以免线程池泛滥?是否有一些内置的机制,或者我必须编写自己的管理器?

编辑

我的目标是更快地处理消息。消息的处理涉及对第三方的异步调用,因此目前我的实现在遍历队列上浪费了大量时间。

c# msmq
2个回答
4
投票

我认为托管更多队列读取器实例会更简单。然后,您可以根据需要通过部署/取消部署更多实例来快速扩展和缩减。

它也成为一种管理,而不是一个发展问题,这才是扩展应该有的样子。


0
投票

您可以使用“生产者消费者模式”...

.NET 4 及更高版本具有

Concurrent
集合,它们是线程安全的,并且实现“大部分无锁”(因此在多线程中表现良好)...

您可以使用

BlockingCollection
与 TPL 结合来实现您想要的目标,而不必担心线程池饥饿或类似的情况...您只需将行
this.Handle(m.Body);
更改为类似
MyBlockingCollection.Add(m.Body);
的内容并启动“消费者线程”它在
MyBlockingCollection
上工作并执行实际工作(即,在
this.Handle
的下一个项目上调用
MyBlockingCollection
,例如通过调用
TryTake
获得)...请参阅上面的链接以获取基本示例...

© www.soinside.com 2019 - 2024. All rights reserved.