RabbitMQ:Ack vs NoAck

问题描述 投票:0回答:1

目前,我正在测试RabbitMQ。我们希望通过确认(手动)来运行我们的消费者。但是,将noAck设置为false会使队列在排队时同步,并将性能从每秒消耗的20000条消息降低到每秒消耗的2条消息。我将预取增加到20,但是每秒只给我20条消息。

带有noAck = true的代码

class Program
{
    private static IEnumerable<int> Counter()
    {

        int i = 1;
        while (!_isStopping)
            yield return i;

        _isStopped = true;
    }

    private static volatile bool _isStopped;
    private static volatile bool _isStopping;

    private static string _queueName = ConfigurationManager.AppSettings["QueueName"];
    private static string _hostName = ConfigurationManager.AppSettings["HostName"];
    private static bool _isPersistent = false;

    public static void Main()
    {

        var factory = new ConnectionFactory() { HostName = _hostName };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {


                //channel.ConfirmSelect();
                channel.QueueDeclare(_queueName, _isPersistent,false, false, null);
                channel.BasicQos(0, 20, false);


                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);


                channel.BasicConsume(_queueName,true, consumer);

                Task run = new Task(() =>
                {

                    Parallel.ForEach(Counter(), c =>
                    {
                        BasicDeliverEventArgs args = consumer.Queue.Dequeue();

                    });

                });


                run.Start();

                Console.WriteLine("Press enter to exit");
                Console.ReadLine();

                _isStopping = true;

                while (!_isStopped)
                    Thread.Sleep(1000);

            }
        }
    }

}

带有noAck的代码= false

class Program
{
    private static IEnumerable<int> Counter()
    {

        int i = 1;
        while (!_isStopping)
            yield return i;

        _isStopped = true;
    }

    private static volatile bool _isStopped;
    private static volatile bool _isStopping;

    private static string _queueName = ConfigurationManager.AppSettings["QueueName"];
    private static string _hostName = ConfigurationManager.AppSettings["HostName"];
    private static bool _isPersistent = false;

    public static void Main()
    {

        var factory = new ConnectionFactory() { HostName = _hostName };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {


                //channel.ConfirmSelect();
                channel.QueueDeclare(_queueName, _isPersistent,false, false, null);
                channel.BasicQos(0, 20, false);


                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);


                channel.BasicConsume(_queueName,false, consumer);

                Task run = new Task(() =>
                {

                    Parallel.ForEach(Counter(), c =>
                    {
                        BasicDeliverEventArgs args = consumer.Queue.Dequeue();
                        channel.BasicAck(args.DeliveryTag, false);


                    });

                });


                run.Start();

                Console.WriteLine("Press enter to exit");
                Console.ReadLine();

                _isStopping = true;

                while (!_isStopped)
                    Thread.Sleep(1000);

            }
        }
    }

}

我知道创建多个渠道可能会提高性能,但是我认为创建数千个渠道不是一个好主意。为什么启用ack会有如此可怕的结果,我该如何解决?

c# queue rabbitmq
1个回答
0
投票

简而言之,在单个通道上,ack和并行化似乎有些奇怪的行为。如果将Parallel for更改为常规for循环,则性能会大大提高。我怀疑大多数线程都进入等待状态,因为它在发送确认之前无法处理下一条消息。

[根据Google组中某些RabbitMq频道,不应在线程之间共享。

© www.soinside.com 2019 - 2024. All rights reserved.