我们在单个 Azure Functions 实例/项目中拥有许多用于服务总线的 Azure Functions。问题是某些 Azure 服务总线的负载非常高,从而导致资源匮乏。是否可以通过代码单独限制那些高负载的 Azure Functions?如果可以的话,我们可以采用什么方法?
我同意@Sean Feldman
参考这里。
基于目标的缩放可以同时放大四个实例,缩放确定依赖于一个简单的方程:
为了说明这一点,方程式如下:每个实例
desired instances = event source length / target executions
。
每个实例的目标执行的默认值源自 Azure Functions 扩展所使用的 SDK。无需进行任何调整即可使基于目标的扩展有效发挥作用。
当同一功能应用程序中的多个功能同时请求扩展时,将计算跨功能的这些请求的总和,以确定所需实例中的调整。扩展请求优先于缩减请求。
如果存在缩容请求而没有相应的扩容请求,则使用最大缩容值。
Azure Functions 使您能够设置每个函数的最大并发执行数,从而使您能够有效管理同时执行并防止资源耗尽。您可以在 host.json 文件中配置并发限制,如下所示:-
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
},
"enableLiveMetricsFilters": true
}
},
"extensions": {
"queues": {
"maxConcurrentCalls": 16,
"visibilityTimeout": "00:05:00"
}
}
}
此外,在函数中实施反压、速率限制和断路器模式,以有效处理高负载。当负载超过阈值时,背压会减慢或暂时停止消息接受。速率限制控制消息处理速率,以防止下游系统不堪重负。断路器模式会在持续故障或高负载期间暂时停止消息处理,从而防止级联故障。
这是我的 Azure Functions 服务总线队列触发器 C# 代码,用于实现反压处理、速率限制和断路器逻辑:-
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
namespace FunctionApp7
{
public class Function1
{
private static readonly Random _random = new Random();
private static bool _circuitBreakerOpen = false;
private static DateTime _circuitBreakerResetTime = DateTime.MinValue;
private static int _concurrencyLimit = 16; // Initial concurrency limit
private static SemaphoreSlim _rateLimitSemaphore = new SemaphoreSlim(10, 10); // Initial rate limit
[FunctionName("Function1")]
public async Task RunAsync([ServiceBusTrigger("myqueue", Connection = "conn")] string myQueueItem, ILogger log)
{
try
{
// Circuit breaker logic
if (_circuitBreakerOpen && DateTime.UtcNow < _circuitBreakerResetTime)
{
log.LogWarning("Circuit breaker is open. Message processing skipped.");
return;
}
// Rate limiting
await _rateLimitSemaphore.WaitAsync();
// Backpressure handling
if (Interlocked.Increment(ref _concurrencyLimit) > 20) // Adjust as needed
{
log.LogWarning("Concurrency limit reached. Backing off.");
await Task.Delay(TimeSpan.FromSeconds(5)); // Adjust backoff duration as needed
Interlocked.Decrement(ref _concurrencyLimit);
return;
}
// Simulate processing
await SimulateProcessing();
log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
}
catch (Exception ex)
{
log.LogError(ex, $"Error processing message: {myQueueItem}");
// Implement circuit breaker logic here
_circuitBreakerOpen = true;
_circuitBreakerResetTime = DateTime.UtcNow.AddMinutes(5); // Reset circuit breaker after 5 minutes
}
finally
{
_rateLimitSemaphore.Release();
Interlocked.Decrement(ref _concurrencyLimit);
}
}
private async Task SimulateProcessing()
{
// Simulate processing time
await Task.Delay(_random.Next(100, 1000));
}
}
}
输出:-