我们可以在服务总线主题触发器 Azure Function 中使用 Polly 重试来代替 ExponentialBackoffRetry 吗?

问题描述 投票:0回答:1

我们正在使用服务总线主题触发器Azure函数,并且我们计划在Azure函数中实现一个简单的行为,如果在处理/处理过程中出现任何异常,我们希望推迟下一次重试。

目前我们计划使用

[ExponentialBackoffRetry]
属性,如下面的代码所示。

但是我们可以使用Polly重试来代替

[ExponentialBackoffRetry]
吗?基本上哪种方法对于我们的要求来说是空闲的 -
[ExponentialBackoffRetry]
Polly 重试

以下是我们的服务总线主题触发器Azure功能:

 [FunctionName(nameof(CardGroupEventSubscriber))]
 [ExponentialBackoffRetry(5, "00:00:04", "00:01:00")]
 public async Task RunAsync([ServiceBusTrigger("%ServiceBusConfigOptions:TopicEventTypeName%", "%ServiceBusConfigOptions:TopicEventTypeSubscription%",
            Connection = "ServiceBusConfigOptions:ConnectionString")]
            string sbMsg)
        {
            try
            {   
                var message = sbMsg.AsPoco<CardGroupEvent>();

                _logger.LogInformation("{class} - {method} - {RequestId} - Start",
                   nameof(CardGroupEventSubscriber), nameof(CardGroupEventSubscriber.RunAsync), message.RequestID);

                _logger.LogInformation($"Started processing message {message.AsJson()} with", nameof(CardGroupEventSubscriber));

                var validationResult = new CardGroupEventValidator().Validate(message);

                if (validationResult.IsValid)
                {
                    await _processor.ProcessAsync(message);
                }
                
            catch (Exception ex)
            {
                _logger.LogError($"Unable to process card group event {sbMsg.AsJson()} with {nameof(CardGroupEventSubscriber)}," +
                    $" ExceptionMessage:{ex.Message}, StackTrace: {ex.StackTrace}");
                throw;
            }
            #endregion

        }
c# azure azure-functions azureservicebus polly
1个回答
1
投票

Polly 的策略可以以命令式的方式定义和使用。

ExponentialBackoffRetry
属性可以被视为声明性的。

那么,假设您想要定义一个策略

  • 进行带有抖动的指数退避
  • 仅当抛出
    CosmosException
    时 然后你这样做:
const int maxRetryAttempts = 10;
const int oneSecondInMilliSeconds = 1000;
const int maxDelayInMilliseconds = 32 * oneSecondInMilliSeconds;
var jitterer = new Random();
var policy = Policy
  .Handle<CosmosException>()
  .WaitAndRetryAsync(
      maxRetryAttempts,
      retryAttempt =>
      {
          var calculatedDelayInMilliseconds = Math.Pow(2, retryAttempt) * oneSecondInMilliSeconds;
          var jitterInMilliseconds = jitterer.Next(0, oneSecondInMilliSeconds);

          var actualDelay = Math.Min(calculatedDelayInMilliseconds + jitterInMilliseconds, maxDelayInMilliseconds);
          return TimeSpan.FromMilliseconds(actualDelay);
      }
  );
  • 我在这里自己实现了指数退避
    • 主要是为了减少包裹数量(所以,我们不需要
      Polly.Contrib.WaitAndRetry

现在让我们将其应用到您的

RunAsync
方法中

[FunctionName(nameof(CardGroupEventSubscriber))]
public async Task RunAsync([ServiceBusTrigger("%ServiceBusConfigOptions:TopicEventTypeName%", "%ServiceBusConfigOptions:TopicEventTypeSubscription%",
    Connection = "ServiceBusConfigOptions:ConnectionString")]
    string sbMsg)
  => await GetExponentialBackoffRetryPolicy.ExecuteAsync(() => RunCoreAsync(sbMsg));

private async Task RunCoreAsync(string sbMsg)
{
    try
    ...
}
  • 我已将原始
    RunAsync
    的代码移至
    RunCoreAsync
    方法中
  • 我已将
    RunAsync
    实现替换为单行,它创建了上述策略,然后装饰
    RunCoreAsync

旁注:对于 CosmosDb,以不同的方式处理速率限制/节流可能是有意义的。

当我收到

CosmosException
并且
StatusCode
是 429 时,然后使用
RetryAfter
的值来延迟重试,就像这样

var policy = Policy
    .Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
    .WaitAndRetryAsync(maxRetryAttempts,
       sleepDurationProvider:(_, ex, __) => ((CosmosException)ex).RetryAfter.Value,
       onRetryAsync: (_, __, ___, ____) => Task.CompletedTask);

更新#1:结合两项政策

如果您愿意,您可以结合上述两项政策。您所需要做的就是使它们独立。因此,无论发生什么情况,都只应触发其中一项策略。最简单的解决方案是将此

ex => ex.StatusCode != HttpStatusCode.TooManyRequests
谓词传递给指数退避策略

IAsyncPolicy GetExponentialBackoffRetryPolicy()
    => Policy
    .Handle<CosmosException>(ex => ex.StatusCode != HttpStatusCode.TooManyRequests)
    .WaitAndRetryAsync(
        maxRetryAttempts,
        retryAttempt =>
        {
            var calculatedDelayInMilliseconds = Math.Pow(2, retryAttempt) * oneSecondInMilliSeconds;
            var jitterInMilliseconds = jitterer.Next(0, oneSecondInMilliSeconds);

            var actualDelay = Math.Min(calculatedDelayInMilliseconds + jitterInMilliseconds, maxDelayInMilliseconds);
            return TimeSpan.FromMilliseconds(actualDelay);
        }
    );

IAsyncPolicy GetThrottlingAwareRetryPolicy()
    => Policy
    .Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
    .WaitAndRetryAsync(maxRetryAttempts,
        sleepDurationProvider: (_, ex, __) => ((CosmosException)ex).RetryAfter.Value,
        onRetryAsync: (_, __, ___, ____) => Task.CompletedTask);

为了将这两者合而为一,你有很多选择,我建议使用

Policy.WrapAsync

IAsyncPolicy retryPolicy = Policy.WrapAsync(GetExponentialBackoffRetryPolicy(), GetThrottlingAwareRetryPolicy());
//OR
IAsyncPolicy retryPolicy = Policy.WrapAsync(GetThrottlingAwareRetryPolicy(), GetExponentialBackoffRetryPolicy());

这里的顺序并不重要,因为它们是独立的策略。

© www.soinside.com 2019 - 2024. All rights reserved.