我们有许多进程内的 azure 函数,它们具有以下绑定:
public async Task Run(
[ServiceBusTrigger("%TriggerQueueName%", Connection = "TriggerQueueConnection")] InitMessageBody messageBody, string correlationId, ILogger log,
[ServiceBus("%TriggerQueueName%", Connection = "TriggerQueueConnection", EntityType = ServiceBusEntityType.Queue)] ServiceBusSender originatingQueueSender,
[ServiceBus("%OutputQueueName%", Connection = "OutputQueueConnection", EntityType = ServiceBusEntityType.Queue)] ServiceBusSender outputQueueSender
)
我们使用 ServiceBusSender 实例而不是输出绑定,因为它使我们可以轻松控制批量发送(在输出发送方的情况下),但我们还有指数退避重试处理程序模式,用于安排原始消息的副本当发生暂时性错误时返回到原始队列。
我还有更多函数需要编写,现在隔离是 Azure Functions 的首选模型(也是目前 .net8 唯一可用的模型),我想我应该尝试一下,但我不知道如何实现实现与使用 ServiceBusSender 类似的策略,我不认为它可以作为 v5.x 函数中的绑定使用。
对于输出批处理,据我所知,虽然您可以通过将返回类型设置为 T[] 在输出上发送多条消息,但您只能提供消息正文,并且无法控制 messageId 或 correlationId 等元数据,这我想要。
为了将消息调度回原始队列,我能看到的唯一方法是在函数构造函数中启动 ServiceBusClient 并使用它来生成 ServiceBusSender。
手动启动/关闭客户端和发件人是执行我想要执行的操作的唯一/正确方法吗?
如果这是正确的方法,那么应该如何处理ServiceBusClient的处置? 我是否最好使用 DI 为客户端提供服务,并在适当的时候让其进行处理(我已经为 Web 应用程序集成这样做了,但不确定在给定函数生命周期的情况下如何最好地做到这一点)?
感觉出现了很多麻烦,我猜是由进程内模型处理的 - 思考是否应该回退到 .Net6,直到进程内模型可用于 .NET8。
我在.NET 8隔离模型中尝试了下面的示例代码,使用服务总线队列触发器功能,从输入队列发送消息并使用ServiceBusClient在输出队列中接收消息。
代码:
Function1.cs:
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace FunctionApp101
{
public class Function1
{
private readonly ILogger<Function1> _logger;
private readonly ServiceBusClient _serviceBusClient;
private readonly ServiceBusClient _outputQueueServiceBusClient;
public Function1(ILogger<Function1> logger, ServiceBusClient serviceBusClient, ServiceBusClient outputQueueServiceBusClient)
{
_logger = logger;
_serviceBusClient = serviceBusClient;
_outputQueueServiceBusClient = outputQueueServiceBusClient;
}
[Function(nameof(Function1))]
public async Task Run(
[ServiceBusTrigger("<input_queueName>", Connection = "servicebus")]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
await SendMessageToAnotherQueueAsync(message);
await messageActions.CompleteMessageAsync(message);
}
private async Task SendMessageToAnotherQueueAsync(ServiceBusReceivedMessage message)
{
try
{
var sender = _outputQueueServiceBusClient.CreateSender("<output_queueName>");
var newMessage = new ServiceBusMessage(message.Body)
{
MessageId = Guid.NewGuid().ToString(),
CorrelationId = message.MessageId
};
await sender.SendMessageAsync(newMessage);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error sending message to output queue.");
}
}
}
}
Program.cs:
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Azure.Messaging.ServiceBus;
using System;
using Microsoft.Extensions.Hosting;
var host = new HostBuilder()
.ConfigureFunctionsWebApplication()
.ConfigureServices(services =>
{
services.AddSingleton(provider =>
{
var connectionString = Environment.GetEnvironmentVariable("servicebus");
return new ServiceBusClient(connectionString);
});
services.AddSingleton(provider =>
{
var outputQueueConnectionString = Environment.GetEnvironmentVariable("outputQueueConnectionString");
return new ServiceBusClient(outputQueueConnectionString);
});
})
.Build();
host.Run();
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"servicebus": "<inputqueue_connec>",
"outputQueueConnectionString": "<outputqueue_connec>"
}
}
输出:
以下功能开始运行,如下所示。
接下来,我向 Azure 服务总线中的输入队列(kamqueue)发送一条消息,如下所示。
我在 Azure 门户的输出队列中收到了上述输入队列消息。
我成功从输入队列(kamqueue)发送消息并在输出队列中接收它。
Azure Functions Core Tools
Core Tools Version: 4.0.5504 Commit hash: N/A +f829589bcaxxxxxxxxxxxxxx (64-bit)
Function Runtime Version: 4.28.3.21820
[2024-02-17T02:04:55.530Z] Found C:\Users\xxxxxx\source\repos\FunctionApp101\FunctionApp101\FunctionApp101.csproj. Using for user secrets file configuration.
[2024-02-17T02:05:05.349Z] Azure Functions .NET Worker (PID: 4948) initialized in debug mode. Waiting for debugger to attach...
[2024-02-17T02:05:05.590Z] Worker process started and initialized.
Functions:
Function1: serviceBusTrigger
For detailed output, run func with --verbose flag.
[2024-02-17T02:05:10.915Z] Host lock lease acquired by instance ID '0000000000xxxxxxxxxxxxxxxx'.
[2024-02-17T02:05:40.240Z] Executing 'Functions.Function1' (Reason='(null)', Id=4129fa78xxxxxxxxxxxxx)
[2024-02-17T02:05:40.244Z] Trigger Details: MessageId: b6103f31586540888341xxxxxxxxxxxx, SequenceNumber: 4, DeliveryCount: 1, EnqueuedTimeUtc: 2024-02-17T02:05:39.2850000+00:00, LockedUntilUtc: 2024-02-17T02:06:39.3010000+00:00, SessionId: (null)
[2024-02-17T02:05:41.060Z] Message Body: Hi Kamali
[2024-02-17T02:05:41.060Z] Message ID: b6103f31586540888341xxxxxxxxxxx
[2024-02-17T02:05:41.060Z] Message Content-Type: (null)
[2024-02-17T02:05:45.300Z] Start processing HTTP request POST http://127.0.0.1:50310/Settlement/Complete
[2024-02-17T02:05:45.305Z] Sending HTTP request POST http://127.0.0.1:50310/Settlement/Complete
[2024-02-17T02:05:45.914Z] Received HTTP response headers after 592.3766ms - 200
[2024-02-17T02:05:45.916Z] End processing HTTP request after 630.5462ms - 200
[2024-02-17T02:05:46.065Z] Executed 'Functions.Function1' (Succeeded, Id=4129fa78xxxxxxxxxxxxx, Duration=5982ms)