我知道这可能是一个微不足道的问题,但我不知道该怎么办。我将 Azure 服务总线与 MassTransit 一起使用。
我有两项服务。 Service1 是一个 WebApi 应用程序,Service2 是几个响应服务总线主题的 Azure Functions 单元。
在 Service1 中,我有一个 TestMessage 主题的使用者。如果我通知该主题,罚款就会传递给消费者。但是当消费者内部发生异常时,故障消费者会立即通知它。
配置:
services.AddMassTransit(o =>
{
o.AddConsumers(Assembly.GetExecutingAssembly());
o.UsingAzureServiceBus((context, cfg) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
cfg.Host(new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential()
});
cfg.SubscriptionEndpoint<TestMessage>(formatter.Consumer<TestMessageConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageConsumer>(context);
});
cfg.SubscriptionEndpoint<Fault<TestMessage>>(formatter.Consumer<TestMessageFaultConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageFaultConsumer>(context);
});
});
});
消费者:
public class TestMessageConsumer : IConsumer<TestMessage>
{
public Task Consume(ConsumeContext<TestMessage> context)
{
throw new Exception();
return Task.CompletedTask;
}
}
public class TestMessageFaultConsumer : IConsumer<Fault<TestMessage>>
{
public Task Consume(ConsumeContext<Fault<TestMessage>> context)
{
return Task.CompletedTask;
}
}
但是,如果我向 Service2 中的 Azure Function 发送消息并发生异常,则会重复调用,直到达到最大传递计数 (10)。然后,Service1 中的故障消费者会被调用多次。
service1中配置:
if (!await adminClient.TopicExistsAsync(formatter.MessageName<TestMessage>()))
await adminClient.CreateTopicAsync(new CreateTopicOptions(formatter.MessageName<TestMessage>()));
if (!await adminClient.SubscriptionExistsAsync(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()))
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()));
servce2中的配置:
services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumer<TestMessageConsumer>();
}, "ServiceBusConnection", (context, configuration) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
var settings = new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential(),
};
configuration.Host(settings);
});
功能:
public Function(IMessageReceiver receiver)
{
_receiver = receiver;
}
[Microsoft.Azure.Functions.Worker.Function(nameof(Function))]
public async Task Run([Microsoft.Azure.Functions.Worker.ServiceBusTrigger(TopicName, SubscriptionName, Connection = Connection)] ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
{
await _receiver.HandleConsumer<TestMessageConsumer>(TopicName, SubscriptionName, message, cancellationToken);
}
我想我并不完全清楚为什么它被这样多次调用,我真的不知道该怎么办。我希望如果我收到一条消息并且它陷入异常,则不再进行调用,因为如果它运行 10 次,我不知道是否可以复制数据库中的数据等。这对我来说是有意义的如果 Azure 功能不可用、消息通过网络丢失等情况,它将运行 10 次。
非常感谢
Azure Function 处理的消息在遇到异常时会多次重试,导致多次调用 Service1 中的故障使用者的问题。为了处理它,我们可以使用消息处理行为来限制重试次数或指定重试策略。
services.AddMassTransit(o =>
{
o.AddConsumers(Assembly.GetExecutingAssembly());
o.UsingAzureServiceBus((context, cfg) =>
{
var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();
cfg.Host(new HostSettings
{
ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
TokenCredential = new DefaultAzureCredential()
});
cfg.SubscriptionEndpoint<TestMessage>(formatter.Consumer<TestMessageConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageConsumer>(context);
// Configure retry policy if needed
e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5))); // Example retry policy: 3 retries with 5 seconds interval
});
cfg.SubscriptionEndpoint<Fault<TestMessage>>(formatter.Consumer<TestMessageFaultConsumer>(), e =>
{
e.ConfigureConsumer<TestMessageFaultConsumer>(context);
});
});
});
Function
类中,您应该处理异常和消息以防止重试。ServiceBusReceiver.AbandonMessageAsync
方法Azure.Messaging.ServiceBus
catch (Exception ex)
{
// Log the exception or handle it appropriately
// If processing failed due to an unrecoverable error, abandon the message
await messageActions.AbandonMessageAsync(message);
}
}