我有以下流程,我认为使用 AzureServiceBus 和会话很容易实现: 我有一个订阅,它从一条消息中创建 +5 条消息,并将所有这些消息通过批量发送到另一个启用会话的订阅。因此我为这些 +5 消息设置了相同的会话 ID。 发送消息不是问题。但下一步是:我想我可以创建一个处理器,它可以让我获得具有相同会话 ID 的所有消息。每条消息都会调用一个可能会失败的服务。如果一个调用失败,则之前在该会话中完成的所有其他消息都必须回滚。但我想这不是会话在 AzureServiceBus 中的工作方式。
我从以下代码开始(位于实现 IHostedService 的处理器类中的方法
StartAsync
内):
[...]
sessionProcessor = serviceBusClient.CreateSessionProcessor(
processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic,
processorOptions.Subscription,
options);
// Configure the message and error handler to use
sessionProcessor.ProcessMessageAsync += MessageHandler;
sessionProcessor.ProcessErrorAsync += ErrorHandler;
sessionProcessor.SessionInitializingAsync += SessionInitializingHandler;
sessionProcessor.SessionClosingAsync += SessionClosingHandler;
await sessionProcessor.StartProcessingAsync();
[...]
问题是
MessageHandler
只处理一条消息并立即完成它。那不是我需要的。
所以我在这里尝试过:
var receiver = await serviceBusClient.AcceptNextSessionAsync(processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic, processorOptions.Subscription);
var messages = await receiver.ReceiveMessagesAsync(100);
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
foreach (var message in messages)
{
await ProcessSessionRequest(message);
await receiver.CompleteMessageAsync(message);
await receiver.SetSessionStateAsync(new BinaryData(SessionState.SessionInProcess));
}
ts.Complete();
}
但是这里我没有像会话处理器那样的错误处理。这意味着:如果一条消息失败并引发异常,我的处理器将完全关闭,而不是再次尝试或将消息移至死信队列并移至另一个会话。
有人知道如何正确实施吗?我唯一的另一个想法是用必要的事件编写我自己的处理器。但也许我只是错过了一些东西。
感谢@Sean Feldman。是的,您概述的方法无法处理这种情况,而在单个操作中未收到每个会话的最大消息数。此外,确保会话中消息的原子处理,尤其是在发生故障的情况下,需要更多考虑。
您需要循环,直到收到会话中的所有消息。继续获取消息,直到累积了预期数量或没有更多消息可用为止。
将消息发送到 Azure 服务总线主题,确保具有相关内容的消息分组在同一会话 ID 下。
using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;
using System.Transactions;
class Program
{
static async Task Main(string[] args)
{
string connectionString = "<your_connection_string>";
string topicName = "<your_topic_name>";
string subscriptionName = "<your_subscription_name>";
await ProcessMessages(connectionString, topicName, subscriptionName);
}
static async Task ProcessMessages(string connectionString, string topicName, string subscriptionName)
{
await using var client = new ServiceBusClient(connectionString);
var processor = client.CreateSessionProcessor(topicName, subscriptionName);
processor.ProcessMessageAsync += async args =>
{
var sessionReceiver = args.Session;
var messages = await sessionReceiver.ReceiveMessagesAsync(10); // Maximum of 10 messages per session
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
foreach (var message in messages)
{
try
{
// Process your message here
Console.WriteLine($"Processing message: {message.MessageId}");
await Task.Delay(1000); // Simulate processing
await sessionReceiver.CompleteMessageAsync(message);
}
catch (Exception ex)
{
// Log the exception
Console.WriteLine($"Error processing message {message.MessageId}: {ex.Message}");
// Rollback the transaction
ts.Dispose();
// Abandon the message
await sessionReceiver.AbandonMessageAsync(message);
}
}
ts.Complete(); // Commit the transaction if all messages are processed successfully
}
};
processor.ProcessErrorAsync += args =>
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
Console.WriteLine("Press any key to stop processing...");
Console.ReadKey();
await processor.StopProcessingAsync();
}
}
当消息失败时(例如,由于异常),您需要回滚同一会话中所有先前完成的消息。这就是棘手的地方。将失败的消息移至死信队列 (DLQ) 以进行进一步分析。但是,这不会自动影响其他已完成的消息。
收到消息: