Azure 函数中与进程内 ServiceBusSender 绑定等效的隔离进程是什么?

问题描述 投票:0回答:1

我们有许多进程内的 azure 函数,它们具有以下绑定:

  public async Task Run(
      [ServiceBusTrigger("%TriggerQueueName%", Connection = "TriggerQueueConnection")] InitMessageBody messageBody, string correlationId, ILogger log,
      [ServiceBus("%TriggerQueueName%", Connection = "TriggerQueueConnection", EntityType = ServiceBusEntityType.Queue)] ServiceBusSender originatingQueueSender,
      [ServiceBus("%OutputQueueName%", Connection = "OutputQueueConnection", EntityType = ServiceBusEntityType.Queue)] ServiceBusSender outputQueueSender
      )

我们使用 ServiceBusSender 实例而不是输出绑定,因为它使我们可以轻松控制批量发送(在输出发送方的情况下),但我们还有指数退避重试处理程序模式,用于安排原始消息的副本当发生暂时性错误时返回到原始队列。

我还有更多函数需要编写,现在隔离是 Azure Functions 的首选模型(也是目前 .net8 唯一可用的模型),我想我应该尝试一下,但我不知道如何实现实现与使用 ServiceBusSender 类似的策略,我不认为它可以作为 v5.x 函数中的绑定使用。

对于输出批处理,据我所知,虽然您可以通过将返回类型设置为 T[] 在输出上发送多条消息,但您只能提供消息正文,并且无法控制 messageId 或 correlationId 等元数据,这我想要。

为了将消息调度回原始队列,我能看到的唯一方法是在函数构造函数中启动 ServiceBusClient 并使用它来生成 ServiceBusSender。

手动启动/关闭客户端和发件人是执行我想要执行的操作的唯一/正确方法吗?

如果这是正确的方法,那么应该如何处理ServiceBusClient的处置? 我是否最好使用 DI 为客户端提供服务,并在适当的时候让其进行处理(我已经为 Web 应用程序集成这样做了,但不确定在给定函数生命周期的情况下如何最好地做到这一点)?

感觉出现了很多麻烦,我猜是由进程内模型处理的 - 思考是否应该回退到 .Net6,直到进程内模型可用于 .NET8。

azure-functions azure-servicebus-queues .net-8.0 azure-functions-isolated
1个回答
0
投票

我在.NET 8隔离模型中尝试了下面的示例代码,使用服务总线队列触发器功能,从输入队列发送消息并使用ServiceBusClient在输出队列中接收消息。

代码:

Function1.cs:

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace FunctionApp101
{
    public class Function1
    {
        private readonly ILogger<Function1> _logger;
        private readonly ServiceBusClient _serviceBusClient;
        private readonly ServiceBusClient _outputQueueServiceBusClient;

        public Function1(ILogger<Function1> logger, ServiceBusClient serviceBusClient, ServiceBusClient outputQueueServiceBusClient)
        {
            _logger = logger;
            _serviceBusClient = serviceBusClient;
            _outputQueueServiceBusClient = outputQueueServiceBusClient;
        }

        [Function(nameof(Function1))]
        public async Task Run(
            [ServiceBusTrigger("<input_queueName>", Connection = "servicebus")]
            ServiceBusReceivedMessage message,
            ServiceBusMessageActions messageActions)
        {
            _logger.LogInformation("Message ID: {id}", message.MessageId);
            _logger.LogInformation("Message Body: {body}", message.Body);
            _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
            await SendMessageToAnotherQueueAsync(message);
            await messageActions.CompleteMessageAsync(message);
        }

        private async Task SendMessageToAnotherQueueAsync(ServiceBusReceivedMessage message)
        {
            try
            {
                var sender = _outputQueueServiceBusClient.CreateSender("<output_queueName>");
                var newMessage = new ServiceBusMessage(message.Body)
                {
                    MessageId = Guid.NewGuid().ToString(),
                    CorrelationId = message.MessageId 
                };
                await sender.SendMessageAsync(newMessage);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error sending message to output queue.");
            }
        }
    }
}

Program.cs:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Azure.Messaging.ServiceBus;
using System;
using Microsoft.Extensions.Hosting;

var host = new HostBuilder()
    .ConfigureFunctionsWebApplication()
    .ConfigureServices(services =>
    {
        services.AddSingleton(provider =>
        {
            var connectionString = Environment.GetEnvironmentVariable("servicebus");
            return new ServiceBusClient(connectionString);
        });
        services.AddSingleton(provider =>
        {
            var outputQueueConnectionString = Environment.GetEnvironmentVariable("outputQueueConnectionString");
            return new ServiceBusClient(outputQueueConnectionString);
        });
    })
    .Build();

host.Run();

local.settings.json:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
    "servicebus": "<inputqueue_connec>",
    "outputQueueConnectionString": "<outputqueue_connec>"
  }
}

输出:

以下功能开始运行,如下所示。

enter image description here

接下来,我向 Azure 服务总线中的输入队列(kamqueue)发送一条消息,如下所示。

enter image description here

我在 Azure 门户的输出队列中收到了上述输入队列消息。

enter image description here

我成功从输入队列(kamqueue)发送消息并在输出队列中接收它。

Azure Functions Core Tools
Core Tools Version:       4.0.5504 Commit hash: N/A +f829589bcaxxxxxxxxxxxxxx (64-bit)
Function Runtime Version: 4.28.3.21820

[2024-02-17T02:04:55.530Z] Found C:\Users\xxxxxx\source\repos\FunctionApp101\FunctionApp101\FunctionApp101.csproj. Using for user secrets file configuration.
[2024-02-17T02:05:05.349Z] Azure Functions .NET Worker (PID: 4948) initialized in debug mode. Waiting for debugger to attach...
[2024-02-17T02:05:05.590Z] Worker process started and initialized.

Functions:

        Function1: serviceBusTrigger

For detailed output, run func with --verbose flag.
[2024-02-17T02:05:10.915Z] Host lock lease acquired by instance ID '0000000000xxxxxxxxxxxxxxxx'.
[2024-02-17T02:05:40.240Z] Executing 'Functions.Function1' (Reason='(null)', Id=4129fa78xxxxxxxxxxxxx)
[2024-02-17T02:05:40.244Z] Trigger Details: MessageId: b6103f31586540888341xxxxxxxxxxxx, SequenceNumber: 4, DeliveryCount: 1, EnqueuedTimeUtc: 2024-02-17T02:05:39.2850000+00:00, LockedUntilUtc: 2024-02-17T02:06:39.3010000+00:00, SessionId: (null)
[2024-02-17T02:05:41.060Z] Message Body: Hi Kamali
[2024-02-17T02:05:41.060Z] Message ID: b6103f31586540888341xxxxxxxxxxx
[2024-02-17T02:05:41.060Z] Message Content-Type: (null)
[2024-02-17T02:05:45.300Z] Start processing HTTP request POST http://127.0.0.1:50310/Settlement/Complete
[2024-02-17T02:05:45.305Z] Sending HTTP request POST http://127.0.0.1:50310/Settlement/Complete
[2024-02-17T02:05:45.914Z] Received HTTP response headers after 592.3766ms - 200
[2024-02-17T02:05:45.916Z] End processing HTTP request after 630.5462ms - 200
[2024-02-17T02:05:46.065Z] Executed 'Functions.Function1' (Succeeded, Id=4129fa78xxxxxxxxxxxxx, Duration=5982ms)

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.