如何使用rabbitmq删除响应队列?

问题描述 投票:0回答:1

考虑这个rabbitmq回复队列机制,其中仅检索1条消息,发送回客户端,然后必须删除生成的(自动删除)队列。

    string startResponseConsumer(IModel? channel)
    {
        string replyQueueName = channel.QueueDeclare().QueueName;

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, ea) =>
        {
            if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
                return;

            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);
            tcs.TrySetResult(response);
        };

        channel.BasicConsume(replyQueueName, true, consumer);

        return replyQueueName;
    }

它有效,消息响应被发送回客户端,但队列没有被删除。 10 次调用后,这是控制台中显示的内容。

因此每次调用此方法时,都会添加一个队列。我猜当通道被处置时,它们只会在退出时被删除。

但是这是一项服务,必须一直在线,不能永远积累队列。所以我在接收到的事件中添加了channel.Close()。

    string startResponseConsumer(IModel? channel)
    {
        string replyQueueName = channel.QueueDeclare().QueueName;

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, ea) =>
        {
            if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
                return;

            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);
            tcs.TrySetResult(response);

            // added this to have the queue removed
            if (channel != default)
            {
                channel.Close();
                channel = default;
            }
        };

        channel.BasicConsume(replyQueueName, true, consumer);

        return replyQueueName;
    }

这不是线程安全的,为每个线程创建的通道在处理所有消息之前都会关闭。

有没有正确的方法可以在收到消息后删除自动删除队列?

c# .net rabbitmq
1个回答
0
投票

这有效:

当一切正常并且回复消息收到 ACK 时,消费者关闭其通道。

        string replyQueueName = channel.QueueDeclare().QueueName;

        var consumer = new EventingBasicConsumer(channel);

        consumer.Received += (model, ea) =>
        {
            if (channel != default)
            {
                channel.Close();
                channel = default;
            }
        };

        channel.BasicConsume(replyQueueName, true, consumer);

但是,当由于任何原因(例如服务中断)未发送回复消息时,计时器将启动,导致通道关闭延迟执行 30 秒

DelayedExecution 仅使用 System.Threading.Timer。

DelayedExecution.Go(() => { 
        if (channel != null && channel.IsOpen) { channel.Close(); } 
}, 30000);
© www.soinside.com 2019 - 2024. All rights reserved.