我必须调用一个外部HTTP API,该API仅每4秒通过userId允许一个请求。只要我每次都调用此API发送不同的userId,我就可以以任何速率调用它。
在此代码中,我能够遵守外部API速率,但是我并没有以最佳方式执行此操作,因为某些请求被先前的调用阻止,即使该userId不需要等待也是如此。 (检查代码中的注释)
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp2
{
class Program
{
static void Main(string[] args)
{
var caller = new ExternalAPICaller();
caller.RunCalls();
Console.ReadKey();
}
}
public class ExternalAPICaller
{
private static SemaphoreSlim throttler = new SemaphoreSlim(20); // up to 20 concurrent calls
private static List<string> userIds = new List<string>();
private const int rateLimitByUser = 4000;
public async Task CallAPIWithThrottling(string userId)
{
if (userIds.Contains(userId)) Thread.Sleep(rateLimitByUser);
userIds.Add(userId);
await throttler.WaitAsync();
var task = MockHttpCall(userId);
_ = task.ContinueWith(async s =>
{
await Task.Delay(rateLimitByUser);
throttler.Release();
userIds.Remove(userId);
});
}
public Task MockHttpCall(string id)
{
Console.WriteLine("http call for " + id);
Thread.Sleep(300);
return Task.CompletedTask;
}
public async Task RunCalls()
{
await CallAPIWithThrottling("Mike");
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Sarah");
await CallAPIWithThrottling("Matt");
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Jacob"); // this should be called right away, but the second John makes it wait
await CallAPIWithThrottling("John");
await CallAPIWithThrottling("Amy"); // this should be called right away, but the thrid John makes it wait
}
}
}
我将尝试抽象节流功能,以便我可以对其进行独立测试。我将创建一个Throttler
类,该类可以配置为全局和每用户并发限制和延迟。在您的情况下,配置为:
全局并发限制:20全局延迟:0(允许同时请求不同的用户)每个用户的并发限制:1每位用户延迟:4000
这里是Throttler
类的实现。
public class Throttler<TKey>
{
private readonly SemaphoreSlim _globalSemaphore;
private readonly int _globalDelay;
private readonly int _perKeyConcurrencyLimit;
private readonly int _perKeyDelay;
private readonly ConcurrentDictionary<TKey, SemaphoreSlim> _perKeySemaphores;
public Throttler(int globalConcurrencyLimit, int globalDelay,
int perKeyConcurrencyLimit, int perKeyDelay)
{
_globalSemaphore = new SemaphoreSlim(globalConcurrencyLimit);
_globalDelay = globalDelay;
_perKeyConcurrencyLimit = perKeyConcurrencyLimit;
_perKeyDelay = perKeyDelay;
_perKeySemaphores = new ConcurrentDictionary<TKey, SemaphoreSlim>();
}
public async Task<TResult> Execute<TResult>(TKey key,
Func<Task<TResult>> taskFactory)
{
var perKeySemaphore = _perKeySemaphores.GetOrAdd(
key, _ => new SemaphoreSlim(_perKeyConcurrencyLimit));
await perKeySemaphore.WaitAsync().ConfigureAwait(false);
await _globalSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var task = taskFactory();
return await task.ConfigureAwait(false);
}
finally
{
ReleaseSemaphoresAsync();
}
async void ReleaseSemaphoresAsync()
{
var globalDelayTask = Task.Delay(_globalDelay);
var perKeyDelayTask = Task.Delay(_perKeyDelay);
await globalDelayTask.ConfigureAwait(false);
_globalSemaphore.Release();
await perKeyDelayTask.ConfigureAwait(false);
perKeySemaphore.Release();
}
}
}
在此实现中,即使taskFactory
方法失败,也会强制执行延迟。更加精进的实现可能会区分出工厂方法中的异常和任务等待中的异常。
用法示例:
var throttler = new Throttler<string>(20, 0, 1, 4000);
var keys = new string[] { "Mike", "John", "Sarah", "Matt", "John", "Jacob",
"John", "Amy" };
var tasks = new List<Task>();
foreach (var key in keys)
{
tasks.Add(throttler.Execute(key, () => MockHttpCall(key)));
}
Task.WaitAll(tasks.ToArray());
async Task<int> MockHttpCall(string id)
{
Console.WriteLine($"> {DateTime.Now:HH:mm:ss.fff} HTTP call for " + id);
await Task.Delay(300);
return 0;
}
输出:
11:20:41.635对Mike的HTTP调用11:20:41.652对John的HTTP调用11:20:41.652对Sarah的HTTP调用11:20:41.652 Matt的HTTP调用11:20:41.653对Jacob的HTTP调用11:20:41.654 Amy的HTTP调用11:20:45.965 John的HTTP调用11:20:50.272 John的HTTP调用