.NET - Azure Function - Azure 服务总线重复调用

问题描述 投票:0回答:1

我知道这可能是一个微不足道的问题,但我不知道该怎么办。我将 Azure 服务总线与 MassTransit 一起使用。

我有两项服务。 Service1 是一个 WebApi 应用程序,Service2 是几个响应服务总线主题的 Azure Functions 单元。

在 Service1 中,我有一个 TestMessage 主题的使用者。如果我通知该主题,罚款就会传递给消费者。但是当消费者内部发生异常时,故障消费者会立即通知它。

配置:

services.AddMassTransit(o =>
{
    o.AddConsumers(Assembly.GetExecutingAssembly());

    o.UsingAzureServiceBus((context, cfg) =>
    {
        var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();

        cfg.Host(new HostSettings
        {
            ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
            TokenCredential = new DefaultAzureCredential()
        });
        
        cfg.SubscriptionEndpoint<TestMessage>(formatter.Consumer<TestMessageConsumer>(), e =>
        {
            e.ConfigureConsumer<TestMessageConsumer>(context);
        });

        cfg.SubscriptionEndpoint<Fault<TestMessage>>(formatter.Consumer<TestMessageFaultConsumer>(), e =>
        {
            e.ConfigureConsumer<TestMessageFaultConsumer>(context);
        });
    });
});

消费者:

public class TestMessageConsumer : IConsumer<TestMessage>
{
    public Task Consume(ConsumeContext<TestMessage> context)
    {
        throw new Exception();
        return Task.CompletedTask;
    }
}

public class TestMessageFaultConsumer : IConsumer<Fault<TestMessage>>
{
    public Task Consume(ConsumeContext<Fault<TestMessage>> context)
    {
        return Task.CompletedTask;
    }
}

但是,如果我向 Service2 中的 Azure Function 发送消息并发生异常,则会重复调用,直到达到最大传递计数 (10)。然后,Service1 中的故障消费者会被调用多次。

service1中配置:

        if (!await adminClient.TopicExistsAsync(formatter.MessageName<TestMessage>()))
            await adminClient.CreateTopicAsync(new CreateTopicOptions(formatter.MessageName<TestMessage>()));
        if (!await adminClient.SubscriptionExistsAsync(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()))
            await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(formatter.MessageName<TestMessage>(), formatterCheckFunc.ConsumerFromMessageName<TestMessageConsumer>()));

servce2中的配置:

        services.AddMassTransitForAzureFunctions(cfg =>
        {
            cfg.AddConsumer<TestMessageConsumer>();
        }, "ServiceBusConnection", (context, configuration) =>
        {
            var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();

            var settings = new HostSettings
            {
                ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
                TokenCredential = new DefaultAzureCredential(),
            };

            configuration.Host(settings);
        });

功能:

    public Function(IMessageReceiver receiver)
    {
        _receiver = receiver;
    }

    [Microsoft.Azure.Functions.Worker.Function(nameof(Function))]
    
    public async Task Run([Microsoft.Azure.Functions.Worker.ServiceBusTrigger(TopicName, SubscriptionName, Connection = Connection)] ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions, CancellationToken cancellationToken)
    {          
        await _receiver.HandleConsumer<TestMessageConsumer>(TopicName, SubscriptionName, message, cancellationToken);
    }

我想我并不完全清楚为什么它被这样多次调用,我真的不知道该怎么办。我希望如果我收到一条消息并且它陷入异常,则不再进行调用,因为如果它运行 10 次,我不知道是否可以复制数据库中的数据等。这对我来说是有意义的如果 Azure 功能不可用、消息通过网络丢失等情况,它将运行 10 次。

非常感谢

c# .net azure azureservicebus masstransit
1个回答
0
投票

Azure Function 处理的消息在遇到异常时会多次重试,导致多次调用 Service1 中的故障使用者的问题。为了处理它,我们可以使用消息处理行为来限制重试次数或指定重试策略。

services.AddMassTransit(o =>
{
    o.AddConsumers(Assembly.GetExecutingAssembly());

    o.UsingAzureServiceBus((context, cfg) =>
    {
        var connectionStrings = context.GetRequiredService<IOptions<ConnectionStringOption>>();

        cfg.Host(new HostSettings
        {
            ServiceUri = new Uri("sb://" + connectionStrings.Value.Messaging),
            TokenCredential = new DefaultAzureCredential()
        });
        
        cfg.SubscriptionEndpoint<TestMessage>(formatter.Consumer<TestMessageConsumer>(), e =>
        {
            e.ConfigureConsumer<TestMessageConsumer>(context);

            // Configure retry policy if needed
            e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5))); // Example retry policy: 3 retries with 5 seconds interval
        });

        cfg.SubscriptionEndpoint<Fault<TestMessage>>(formatter.Consumer<TestMessageFaultConsumer>(), e =>
        {
            e.ConfigureConsumer<TestMessageFaultConsumer>(context);
        });
    });
});

  • 在 Azure Function
    Function
    类中,您应该处理异常和消息以防止重试。

ServiceBusReceiver.AbandonMessageAsync
方法Azure.Messaging.ServiceBus


    catch (Exception ex)
    {
        // Log the exception or handle it appropriately
        // If processing failed due to an unrecoverable error, abandon the message
        await messageActions.AbandonMessageAsync(message);
    }
}

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.