如何通过代码对Azure Function进行限流

问题描述 投票:0回答:1

我们在单个 Azure Functions 实例/项目中拥有许多用于服务总线的 Azure Functions。问题是某些 Azure 服务总线的负载非常高,从而导致资源匮乏。是否可以通过代码单独限制那些高负载的 Azure Functions?如果可以的话,我们可以采用什么方法?

azure azure-functions azure-servicebus-queues
1个回答
0
投票

我同意@Sean Feldman

参考这里

基于目标的缩放可以同时放大四个实例,缩放确定依赖于一个简单的方程:

为了说明这一点,方程式如下:每个实例

desired instances = event source length / target executions

每个实例的目标执行的默认值源自 Azure Functions 扩展所使用的 SDK。无需进行任何调整即可使基于目标的扩展有效发挥作用。

当同一功能应用程序中的多个功能同时请求扩展时,将计算跨功能的这些请求的总和,以确定所需实例中的调整。扩展请求优先于缩减请求。

如果存在缩容请求而没有相应的扩容请求,则使用最大缩容值。

Azure Functions 使您能够设置每个函数的最大并发执行数,从而使您能够有效管理同时执行并防止资源耗尽。您可以在 host.json 文件中配置并发限制,如下所示:-

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      },
      "enableLiveMetricsFilters": true
    }
  },
  "extensions": {
    "queues": {
      "maxConcurrentCalls": 16, 
      "visibilityTimeout": "00:05:00" 
    }
  }
}

此外,在函数中实施反压、速率限制和断路器模式,以有效处理高负载。当负载超过阈值时,背压会减慢或暂时停止消息接受。速率限制控制消息处理速率,以防止下游系统不堪重负。断路器模式会在持续故障或高负载期间暂时停止消息处理,从而防止级联故障。

这是我的 Azure Functions 服务总线队列触发器 C# 代码,用于实现反压处理、速率限制和断路器逻辑:-

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace FunctionApp7
{
    public class Function1
    {
        private static readonly Random _random = new Random();
        private static bool _circuitBreakerOpen = false;
        private static DateTime _circuitBreakerResetTime = DateTime.MinValue;
        private static int _concurrencyLimit = 16; // Initial concurrency limit
        private static SemaphoreSlim _rateLimitSemaphore = new SemaphoreSlim(10, 10); // Initial rate limit

        [FunctionName("Function1")]
        public async Task RunAsync([ServiceBusTrigger("myqueue", Connection = "conn")] string myQueueItem, ILogger log)
        {
            try
            {
                // Circuit breaker logic
                if (_circuitBreakerOpen && DateTime.UtcNow < _circuitBreakerResetTime)
                {
                    log.LogWarning("Circuit breaker is open. Message processing skipped.");
                    return;
                }

                // Rate limiting
                await _rateLimitSemaphore.WaitAsync();

                // Backpressure handling
                if (Interlocked.Increment(ref _concurrencyLimit) > 20) // Adjust as needed
                {
                    log.LogWarning("Concurrency limit reached. Backing off.");
                    await Task.Delay(TimeSpan.FromSeconds(5)); // Adjust backoff duration as needed
                    Interlocked.Decrement(ref _concurrencyLimit);
                    return;
                }

                // Simulate processing
                await SimulateProcessing();

                log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
            }
            catch (Exception ex)
            {
                log.LogError(ex, $"Error processing message: {myQueueItem}");

                // Implement circuit breaker logic here
                _circuitBreakerOpen = true;
                _circuitBreakerResetTime = DateTime.UtcNow.AddMinutes(5); // Reset circuit breaker after 5 minutes
            }
            finally
            {
                _rateLimitSemaphore.Release();
                Interlocked.Decrement(ref _concurrencyLimit);
            }
        }

        private async Task SimulateProcessing()
        {
            // Simulate processing time
            await Task.Delay(_random.Next(100, 1000));
        }
    }
}

输出:-

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.