我们正在使用服务总线主题触发器Azure函数,并且我们计划在Azure函数中实现一个简单的行为,如果在处理/处理过程中出现任何异常,我们希望推迟下一次重试。
目前我们计划使用
[ExponentialBackoffRetry]
属性,如下面的代码所示。
但是我们可以使用Polly重试来代替
[ExponentialBackoffRetry]
吗?基本上哪种方法对于我们的要求来说是空闲的 - [ExponentialBackoffRetry]
或 Polly 重试
以下是我们的服务总线主题触发器Azure功能:
[FunctionName(nameof(CardGroupEventSubscriber))]
[ExponentialBackoffRetry(5, "00:00:04", "00:01:00")]
public async Task RunAsync([ServiceBusTrigger("%ServiceBusConfigOptions:TopicEventTypeName%", "%ServiceBusConfigOptions:TopicEventTypeSubscription%",
Connection = "ServiceBusConfigOptions:ConnectionString")]
string sbMsg)
{
try
{
var message = sbMsg.AsPoco<CardGroupEvent>();
_logger.LogInformation("{class} - {method} - {RequestId} - Start",
nameof(CardGroupEventSubscriber), nameof(CardGroupEventSubscriber.RunAsync), message.RequestID);
_logger.LogInformation($"Started processing message {message.AsJson()} with", nameof(CardGroupEventSubscriber));
var validationResult = new CardGroupEventValidator().Validate(message);
if (validationResult.IsValid)
{
await _processor.ProcessAsync(message);
}
catch (Exception ex)
{
_logger.LogError($"Unable to process card group event {sbMsg.AsJson()} with {nameof(CardGroupEventSubscriber)}," +
$" ExceptionMessage:{ex.Message}, StackTrace: {ex.StackTrace}");
throw;
}
#endregion
}
Polly 的策略可以以命令式的方式定义和使用。
而
ExponentialBackoffRetry
属性可以被视为声明性的。
那么,假设您想要定义一个策略
CosmosException
时
然后你这样做:const int maxRetryAttempts = 10;
const int oneSecondInMilliSeconds = 1000;
const int maxDelayInMilliseconds = 32 * oneSecondInMilliSeconds;
var jitterer = new Random();
var policy = Policy
.Handle<CosmosException>()
.WaitAndRetryAsync(
maxRetryAttempts,
retryAttempt =>
{
var calculatedDelayInMilliseconds = Math.Pow(2, retryAttempt) * oneSecondInMilliSeconds;
var jitterInMilliseconds = jitterer.Next(0, oneSecondInMilliSeconds);
var actualDelay = Math.Min(calculatedDelayInMilliseconds + jitterInMilliseconds, maxDelayInMilliseconds);
return TimeSpan.FromMilliseconds(actualDelay);
}
);
Polly.Contrib.WaitAndRetry
)现在让我们将其应用到您的
RunAsync
方法中
[FunctionName(nameof(CardGroupEventSubscriber))]
public async Task RunAsync([ServiceBusTrigger("%ServiceBusConfigOptions:TopicEventTypeName%", "%ServiceBusConfigOptions:TopicEventTypeSubscription%",
Connection = "ServiceBusConfigOptions:ConnectionString")]
string sbMsg)
=> await GetExponentialBackoffRetryPolicy.ExecuteAsync(() => RunCoreAsync(sbMsg));
private async Task RunCoreAsync(string sbMsg)
{
try
...
}
RunAsync
的代码移至 RunCoreAsync
方法中RunAsync
实现替换为单行,它创建了上述策略,然后装饰 RunCoreAsync
旁注:对于 CosmosDb,以不同的方式处理速率限制/节流可能是有意义的。
当我收到
CosmosException
并且 StatusCode
是 429 时,然后使用 RetryAfter
的值来延迟重试,就像这样
var policy = Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(maxRetryAttempts,
sleepDurationProvider:(_, ex, __) => ((CosmosException)ex).RetryAfter.Value,
onRetryAsync: (_, __, ___, ____) => Task.CompletedTask);
更新#1:结合两项政策
如果您愿意,您可以结合上述两项政策。您所需要做的就是使它们独立。因此,无论发生什么情况,都只应触发其中一项策略。最简单的解决方案是将此
ex => ex.StatusCode != HttpStatusCode.TooManyRequests
谓词传递给指数退避策略
IAsyncPolicy GetExponentialBackoffRetryPolicy()
=> Policy
.Handle<CosmosException>(ex => ex.StatusCode != HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(
maxRetryAttempts,
retryAttempt =>
{
var calculatedDelayInMilliseconds = Math.Pow(2, retryAttempt) * oneSecondInMilliSeconds;
var jitterInMilliseconds = jitterer.Next(0, oneSecondInMilliSeconds);
var actualDelay = Math.Min(calculatedDelayInMilliseconds + jitterInMilliseconds, maxDelayInMilliseconds);
return TimeSpan.FromMilliseconds(actualDelay);
}
);
IAsyncPolicy GetThrottlingAwareRetryPolicy()
=> Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(maxRetryAttempts,
sleepDurationProvider: (_, ex, __) => ((CosmosException)ex).RetryAfter.Value,
onRetryAsync: (_, __, ___, ____) => Task.CompletedTask);
为了将这两者合而为一,你有很多选择,我建议使用
Policy.WrapAsync
IAsyncPolicy retryPolicy = Policy.WrapAsync(GetExponentialBackoffRetryPolicy(), GetThrottlingAwareRetryPolicy());
//OR
IAsyncPolicy retryPolicy = Policy.WrapAsync(GetThrottlingAwareRetryPolicy(), GetExponentialBackoffRetryPolicy());
这里的顺序并不重要,因为它们是独立的策略。