我正在尝试了解 Polly 的速率限制政策。
public class RateLimiter
{
private readonly AsyncRateLimitPolicy _throttlingPolicy;
private readonly Action<string> _rateLimitedAction;
public RateLimiter(int numberOfExecutions, TimeSpan perTimeSpan, Action<string> rateLimitedAction)
{
_throttlingPolicy = Policy.RateLimitAsync(numberOfExecutions, perTimeSpan);
_rateLimitedAction = rateLimitedAction;
}
public async Task<T> Throttle<T>(Func<Task<T>> func)
{
var result = await _throttlingPolicy.ExecuteAndCaptureAsync(func);
if (result.Outcome == OutcomeType.Failure)
{
var retryAfter = (result.FinalException as RateLimitRejectedException)?.RetryAfter ?? TimeSpan.FromSeconds(1);
_rateLimitedAction($"Rate limited. Should retry in {retryAfter}.");
return default;
}
return result.Result;
}
}
在我的控制台应用程序中,我正在实例化一个
RateLimiter
,每 10 秒最多 5 次调用。
var rateLimiter = new RateLimiter(5, TimeSpan.FromSeconds(10), err => Console.WriteLine(err));
var rdm = new Random();
while (true)
{
var result = await rateLimiter.Throttle(() => Task.FromResult(rdm.Next(1, 10)));
if (result != default) Console.WriteLine($"Result: {result}");
await Task.Delay(200);
}
我预计会看到 5 个结果,并且第 6 个结果的速率受到限制。但这就是我得到的
Result: 9
Rate limited. Should retry in 00:00:01.7744615.
Rate limited. Should retry in 00:00:01.5119933.
Rate limited. Should retry in 00:00:01.2313921.
Rate limited. Should retry in 00:00:00.9797322.
Rate limited. Should retry in 00:00:00.7309150.
Rate limited. Should retry in 00:00:00.4812646.
Rate limited. Should retry in 00:00:00.2313643.
Result: 7
Rate limited. Should retry in 00:00:01.7982864.
Rate limited. Should retry in 00:00:01.5327321.
Rate limited. Should retry in 00:00:01.2517093.
Rate limited. Should retry in 00:00:00.9843077.
Rate limited. Should retry in 00:00:00.7203371.
Rate limited. Should retry in 00:00:00.4700262.
Rate limited. Should retry in 00:00:00.2205184.
我也尝试过使用
ExecuteAsync
代替 ExecuteAndCaptureAsync
并且它并没有改变结果。
public async Task<T> Throttle<T>(Func<Task<T>> func)
{
try
{
var result = await _throttlingPolicy.ExecuteAsync(func);
return result;
}
catch (RateLimitRejectedException ex)
{
_rateLimitedAction($"Rate limited. Should retry in {ex.RetryAfter}.");
return default;
}
}
这对我来说没有任何意义。我有什么遗漏的吗?
速率限制器的工作方式与您想象的有点不同。
假设我有 500 个请求,我想将其限制为每分钟 50 个。
期望:前 50 次执行后,如果执行时间少于一分钟,则速率限制器将启动。
这种直观的方法没有考虑输入负载的平均分配。这可能会导致以下可观察到的行为:
Polly 的速率限制器使用漏桶算法
其工作原理如下:
所以,从技术上来说:
上述描述中最重要的信息如下:漏桶算法使用恒定速率清空桶。
更新 14/11/22
让我纠正一下自己。 Polly 的速率限制器使用的是 token 桶,而不是 leaky 桶。还有其他算法,如固定窗口计数器、滑动窗口日志或滑动窗口计数器。您可以在此处或在系统设计访谈第1卷书的第4章
中阅读有关替代方案的信息那么,我们来谈谈令牌桶算法:
(来源)
如果我们仔细检查实施情况,我们可以看到以下内容:
RateLimiterPolicy
调用RateLimiterEngine
的静态方法RateLimiterEngine
调用IRateLimiter
接口上的方法
RateLimiterFactory
公开了创建LockFreeTokenBucketRateLimiter
public static IRateLimiter Create(TimeSpan onePer, int bucketCapacity)
=> new LockFreeTokenBucketRateLimiter(onePer, bucketCapacity);
请注意参数的命名方式(
onePer
和bucketCapacity
)!
如果您对实际实施感兴趣,那么您可以找到这里。 (几乎每一行都有注释)
我想再强调一件事。速率限制器“不”执行任何重试。如果你想在惩罚时间结束后继续执行,那么你必须自己执行。通过编写一些自定义代码或将重试策略与速率限制器策略相结合。
maxBurst
:
单次突发中允许的最大执行次数(例如,如果一段时间内没有执行)。默认值是
1
,如果您将其设置为
numberOfExecutions
,您将看到第一次执行所需的效果,尽管之后它会恶化到与您观察到的类似模式(我猜它是基于限制器如何“释放”资源和 var onePer = TimeSpan.FromTicks(perTimeSpan.Ticks / numberOfExecutions);
计算,但我没有挖得太深,但根据文档和代码,速率限制似乎发生在“每 perTimeSpan
/numberOfExecutions
执行 1 次”速率而不是“任何选定的 numberOfExecutions
中的 perTimeSpan
”):_throttlingPolicy = Policy.RateLimitAsync(numberOfExecutions, perTimeSpan, numberOfExecutions);
添加定期等待几秒钟会带来“突发”。
另请参阅: