从死信队列重新提交消息 - Azure 服务总线

问题描述 投票:0回答:8

我在 Azure 中创建了一个服务总线队列,它运行良好。如果消息在默认尝试(10 次)内没有被传递,那么它正确地将消息移动到死信队列。

现在,我想将这条消息从死信队列重新提交回其起源的队列,看看它是否再次有效。我已经使用服务总线资源管理器尝试了相同的操作。但它会立即移至死信队列。

是否可以做同样的事情,如果可以的话怎么做?

azure message-queue azureservicebus
8个回答
16
投票

我们有一批大约 60k 的消息,需要从死信队列中重新处理。通过 Service Bus Explorer 查看并发送回消息,每从我的计算机发送 1000 条消息大约需要 6 分钟。我通过为 DLQ 消息设置转发规则到另一个队列并从那里自动转发到原始队列解决了这个问题。对于所有 60k 消息,此解决方案花费了大约 30 秒。


13
投票

您需要发送具有相同负载的新消息。 ASB 设计上不支持消息重新提交。


13
投票

我们经常需要重新提交消息。 @Baglay-Vyacheslav 的回答很有帮助。我粘贴了一些更新的 C# 代码,可与最新的 Azure.Messaging.ServiceBus Nuget 包配合使用。

使得在两个队列/主题/订阅者上处理 DLQ 变得更快/更容易。

using Azure.Messaging.ServiceBus;
using System.Collections.Generic;
using System.Threading.Tasks;
using NLog;

namespace ServiceBus.Tools
{
    class TransferDeadLetterMessages
    {
        // https://github.com/Azure/azure-sdk-for-net/blob/Azure.Messaging.ServiceBus_7.2.1/sdk/servicebus/Azure.Messaging.ServiceBus/README.md

        private static Logger logger = LogManager.GetCurrentClassLogger();

        private static ServiceBusClient client;
        private static ServiceBusSender sender;
    
        public static async Task ProcessTopicAsync(string connectionString, string topicName, string subscriberName, int fetchCount = 10)
        {
            try
            {
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(topicName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(topicName, subscriberName, new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                });

                await ProcessDeadLetterMessagesAsync($"topic: {topicName} -> subscriber: {subscriberName}", fetchCount, sender, dlqReceiver);
            }
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            {
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                {
                    logger.Error(ex, $"Topic:Subscriber '{topicName}:{subscriberName}' not found. Check that the name provided is correct.");
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                await sender.CloseAsync();
                await client.DisposeAsync();
            }
        }

        public static async Task ProcessQueueAsync(string connectionString, string queueName, int fetchCount = 10)
        {         
            try
            {
                client = new ServiceBusClient(connectionString);
                sender = client.CreateSender(queueName);

                ServiceBusReceiver dlqReceiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
                {
                    SubQueue = SubQueue.DeadLetter,
                    ReceiveMode = ServiceBusReceiveMode.PeekLock
                });

                await ProcessDeadLetterMessagesAsync($"queue: {queueName}", fetchCount, sender, dlqReceiver);
            }
            catch (Azure.Messaging.ServiceBus.ServiceBusException ex)
            {
                if (ex.Reason == Azure.Messaging.ServiceBus.ServiceBusFailureReason.MessagingEntityNotFound)
                {
                    logger.Error(ex, $"Queue '{queueName}' not found. Check that the name provided is correct.");
                }
                else
                {
                    throw;
                }
            }
            finally
            {
                await sender.CloseAsync();
                await client.DisposeAsync();
            }
        }

        private static async Task ProcessDeadLetterMessagesAsync(string source, int fetchCount, ServiceBusSender sender, ServiceBusReceiver dlqReceiver)
        {
            var wait = new System.TimeSpan(0, 0, 10);

            logger.Info($"fetching messages ({wait.TotalSeconds} seconds retrieval timeout)");
            logger.Info(source);

            IReadOnlyList<ServiceBusReceivedMessage> dlqMessages = await dlqReceiver.ReceiveMessagesAsync(fetchCount, wait);

            logger.Info($"dl-count: {dlqMessages.Count}");

            int i = 1;

            foreach (var dlqMessage in dlqMessages)
            {
                logger.Info($"start processing message {i}");
                logger.Info($"dl-message-dead-letter-message-id: {dlqMessage.MessageId}");
                logger.Info($"dl-message-dead-letter-reason: {dlqMessage.DeadLetterReason}");
                logger.Info($"dl-message-dead-letter-error-description: {dlqMessage.DeadLetterErrorDescription}");

                ServiceBusMessage resubmittableMessage = new ServiceBusMessage(dlqMessage);

                await sender.SendMessageAsync(resubmittableMessage);

                await dlqReceiver.CompleteMessageAsync(dlqMessage);

                logger.Info($"finished processing message {i}");
                logger.Info("--------------------------------------------------------------------------------------");

                i++;
            }

            await dlqReceiver.CloseAsync();

            logger.Info($"finished");
        }
    }
}

7
投票

尝试消除死信原因

resubmittableMessage.Properties.Remove("DeadLetterReason");
resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");

完整代码

using Microsoft.ServiceBus.Messaging;
using System.Transactions;

namespace ResubmitDeadQueue
{
    class Program
    {
        static void Main(string[] args)
        {

            var connectionString = "";
            var queueName = "";

            var queue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName), ReceiveMode.PeekLock);

            BrokeredMessage originalMessage
                ;
            var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
            do
            {
                originalMessage = queue.Receive();
                if (originalMessage != null)
                {
                    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
                    {
                        // Create new message
                        var resubmittableMessage = originalMessage.Clone();

                        // Remove dead letter reason and description
                        resubmittableMessage.Properties.Remove("DeadLetterReason");
                        resubmittableMessage.Properties.Remove("DeadLetterErrorDescription");

                        // Resend cloned DLQ message and complete original DLQ message
                        client.Send(resubmittableMessage);
                        originalMessage.Complete();

                        // Complete transaction
                        scope.Complete();
                    }
                }
            } while (originalMessage != null);
        }
    }
}

感谢这里的其他一些回复!


5
投票

当您修复并重新提交死信队列中的消息时,Service Bus Explorer 工具始终会创建原始消息的克隆。它不会有任何不同,因为默认情况下服务总线消息传递不提供任何消息修复和重新提交机制。我建议您调查一下为什么您的消息在重新提交时会出现在死信队列及其克隆中。希望这有帮助!


4
投票

它可能是“重复消息检测”,如Peter Berggreen所示,或者更可能的是,如果您直接将 BrokeredMessage 从死信队列移动到实时队列,则 DeliveryCount 仍将最大,并且将返回死信排队。

将 BrokeredMessage 从死信队列中拉出,使用 GetBody() 获取内容,使用该数据在新的 BrokeredMessage 中创建并将其发送到队列。您可以安全地执行此操作,方法是使用 peek 从死信队列中获取消息内容,然后将新消息发送到实时队列,然后再从死信队列中删除消息。这样,如果由于某种原因无法写入实时队列,您将不会丢失任何关键数据。

使用新的 BrokeredMessage,您不应该遇到“重复消息检测”的问题,并且 DeliveryCount 将重置为零。


2
投票

听起来可能与ASB的“重复消息检测”功能有关。

当您在 ServiceBus Explorer 中重新提交消息时,它将克隆该消息,因此新消息将具有与死信队列中的原始消息相同的 ID。

如果您在队列/主题上启用了“需要重复检测”,并且尝试在“重复检测历史记录时间窗口”内重新提交消息,则该消息将立即再次移至死信队列。

如果您想使用 Service Bus Explorer 重新提交死信消息,那么我认为您必须在队列/主题上禁用“需要重复检测”。


0
投票

以下是开发的示例 Python 代码,用于将消息从死信队列 (DLQ) 重新提交到 Azure 服务总线中的普通队列(非主题)。队列空间完全耗尽并且不响应新提交。

代码会小批量地重新提交消息(当前设置为 10 条),以确保通过在成功重新提交后立即从 DLQ 中删除消息来以相同的方式创建空间。

"""
Reads all DLQ messages from a Service Bus Queue and resubmits them to the original queue
"""  

# python -m pip install --upgrade pip
# python -m pip install --upgrade wheel
# python -m pip install --upgrade setuptools
# python -m pip install --upgrade azure-servicebus

import logging
from azure.servicebus import ServiceBusClient, ServiceBusMessage

# Your Service Bus connection string and queue name
CONNECTION_STRING = ""
QUEUE_NAME = "target-queue"

if __name__ == "__main__":

    print("Reads all DLQ messages from a Service Bus Queue and resubmits them to the original queue")

    logging.basicConfig(level=logging.WARNING)

    try:
        with ServiceBusClient.from_connection_string(CONNECTION_STRING) as client:
            with client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
                with client.get_queue_receiver(queue_name=f"{QUEUE_NAME}/$DeadLetterQueue") as receiver:

                    # Loop to ensure all messages are processed
                    while True:
                        dead_letter_messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
                        if not dead_letter_messages:
                            break  # Exit loop if no more messages

                        # Log progress by writing the time and number of messages received to the console
                        print(f"{cc.CGREEN}Received {len(dead_letter_messages)} messages from DLQ{cc.CEND}")

                        for message in dead_letter_messages:
                            try:
                                # Convert generator to string
                                BODY_STR = b''.join(message.body)

                                # Create new ServiceBusMessage
                                resubmitted_message = ServiceBusMessage(body=BODY_STR)

                                # Resubmit the message for processing
                                sender.send_messages(resubmitted_message)

                                # Complete the message from the DLQ
                                receiver.complete_message(message)

                                logging.info("Message resubmitted for processing and removed from DLQ")
                            except Exception as e:
                                logging.error("Error processing message: %s", e)

        logging.info("Completed processing DLQ messages.")
    except Exception as e:
        logging.error("An error occurred: %s", e)


    print("Done")

CONNECTION_STRING
值利用来自Azure服务总线的SAS(共享访问策略)或直接到表示为主要或辅助连接字符串的队列。

这节省了我很多时间,并且可能对其他人有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.