AzureServiceBus:处理同一会话的所有消息,如果失败则回滚

问题描述 投票:0回答:1

我有以下流程,我认为使用 AzureServiceBus 和会话很容易实现: 我有一个订阅,它从一条消息中创建 +5 条消息,并将所有这些消息通过批量发送到另一个启用会话的订阅。因此我为这些 +5 消息设置了相同的会话 ID。 发送消息不是问题。但下一步是:我想我可以创建一个处理器,它可以让我获得具有相同会话 ID 的所有消息。每条消息都会调用一个可能会失败的服务。如果一个调用失败,则之前在该会话中完成的所有其他消息都必须回滚。但我想这不是会话在 AzureServiceBus 中的工作方式。

我从以下代码开始(位于实现 IHostedService 的处理器类中的方法

StartAsync
内):

[...]
sessionProcessor = serviceBusClient.CreateSessionProcessor(
    processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic,
    processorOptions.Subscription,
    options);

// Configure the message and error handler to use
sessionProcessor.ProcessMessageAsync += MessageHandler;
sessionProcessor.ProcessErrorAsync += ErrorHandler;

sessionProcessor.SessionInitializingAsync += SessionInitializingHandler;
sessionProcessor.SessionClosingAsync += SessionClosingHandler;

await sessionProcessor.StartProcessingAsync();
[...]

问题是

MessageHandler
只处理一条消息并立即完成它。那不是我需要的。 所以我在这里尝试过:

var receiver = await serviceBusClient.AcceptNextSessionAsync(processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic, processorOptions.Subscription);

var messages = await receiver.ReceiveMessagesAsync(100);

using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
    foreach (var message in messages)
    {
        await ProcessSessionRequest(message);

        await receiver.CompleteMessageAsync(message);
        await receiver.SetSessionStateAsync(new BinaryData(SessionState.SessionInProcess));
    }

    ts.Complete();
}

但是这里我没有像会话处理器那样的错误处理。这意味着:如果一条消息失败并引发异常,我的处理器将完全关闭,而不是再次尝试或将消息移至死信队列并移至另一个会话。

有人知道如何正确实施吗?我唯一的另一个想法是用必要的事件编写我自己的处理器。但也许我只是错过了一些东西。

session azureservicebus azure-servicebus-subscriptions
1个回答
0
投票

感谢@Sean Feldman。是的,您概述的方法无法处理这种情况,而在单个操作中未收到每个会话的最大消息数。此外,确保会话中消息的原子处理,尤其是在发生故障的情况下,需要更多考虑。

  • 您需要循环,直到收到会话中的所有消息。继续获取消息,直到累积了预期数量或没有更多消息可用为止。

  • 将消息发送到 Azure 服务总线主题,确保具有相关内容的消息分组在同一会话 ID 下。

using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;
using System.Transactions;

class Program
{
    static async Task Main(string[] args)
    {
        string connectionString = "<your_connection_string>";
        string topicName = "<your_topic_name>";
        string subscriptionName = "<your_subscription_name>";

        await ProcessMessages(connectionString, topicName, subscriptionName);
    }

    static async Task ProcessMessages(string connectionString, string topicName, string subscriptionName)
    {
        await using var client = new ServiceBusClient(connectionString);
        var processor = client.CreateSessionProcessor(topicName, subscriptionName);

        processor.ProcessMessageAsync += async args =>
        {
            var sessionReceiver = args.Session;
            var messages = await sessionReceiver.ReceiveMessagesAsync(10); // Maximum of 10 messages per session
            using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                foreach (var message in messages)
                {
                    try
                    {
                        // Process your message here
                        Console.WriteLine($"Processing message: {message.MessageId}");
                        await Task.Delay(1000); // Simulate processing
                        await sessionReceiver.CompleteMessageAsync(message);
                    }
                    catch (Exception ex)
                    {
                        // Log the exception
                        Console.WriteLine($"Error processing message {message.MessageId}: {ex.Message}");

                        // Rollback the transaction
                        ts.Dispose();

                        // Abandon the message
                        await sessionReceiver.AbandonMessageAsync(message);
                    }
                }
                ts.Complete(); // Commit the transaction if all messages are processed successfully
            }
        };

        processor.ProcessErrorAsync += args =>
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        };

        await processor.StartProcessingAsync();
        Console.WriteLine("Press any key to stop processing...");
        Console.ReadKey();
        await processor.StopProcessingAsync();
    }
}
  • 在消息处理逻辑中引入故障,观察系统如何处理故障和回滚。

当消息失败时(例如,由于异常),您需要回滚同一会话中所有先前完成的消息。这就是棘手的地方。将失败的消息移至死信队列 (DLQ) 以进行进一步分析。但是,这不会自动影响其他已完成的消息。

收到消息:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.