我有一个API,每分钟接受20个请求,之后,我需要等待1分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 查询其详细信息,我的想法是我可以使用
Partitioner
将我的列表划分为 20 个项目/请求,但很快我意识到 Partitioner
不起作用像这样,我的第二个想法是在分区中添加一个delay
,但这也是一个坏主意,根据我的理解,它在每个不需要的请求之后添加一个延迟,相反,我需要在每个Partition
之后延迟。下面是我的代码:
public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
[Optional] int delay)
{
var whenAll = await Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
select Task.Run(async delegate {
var allResponses = new List<V>();
using (partition)
while (partition.MoveNext())
{
allResponses.Add(await body(partition.Current));
await Task.Delay(TimeSpan.FromSeconds(delay));
}
return allResponses;
}, token));
return whenAll.SelectMany(x => x);
}
有谁知道我怎样才能做到这一点?
这里是一个
RateLimiter
类,您可以使用它来限制异步操作的频率。这是 RateLimiter
类的更简单实现,可以在 this 答案中找到。
/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
// Schedule the release of the semaphore on the ThreadPool.
// Ensure that the _timeUnit is valid before the scheduling.
Task task;
try { task = Task.Delay(_timeUnit); } catch { _semaphore.Release(); throw; }
ThreadPool.QueueUserWorkItem(async state =>
{
try { await (Task)state; } finally { _semaphore.Release(); }
}, task);
}
}
使用示例:
List<string> urls = GetUrls();
using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));
string[] documents = await Task.WhenAll(urls.Select(async url =>
{
await rateLimiter.WaitAsync();
return await _httpClient.GetStringAsync(url);
}));
在线演示.
注意: 此实现存在漏洞,因为它会启动内部异步
Task.Delay
操作,当您使用完 RateLimiter
后无法取消该操作。除了消耗与活动 RateLimiter
任务相关的资源之外,任何挂起的异步操作都会阻止 Task.Delay
及时被垃圾收集。此外,SemaphoreSlim
也没有按照应有的方式进行处理。这些都是小缺陷,可能不会影响仅创建少量 RateLimiter
的程序。如果您打算创建很多它们,您可以查看此答案的第三次修订版,其中包含一次性
RateLimiter
。
这里是 RateLimiter
类的替代实现,更复杂,它基于
Environment.TickCount64
属性而不是
SemaphoreSlim
。它的优点是不会在后台启动隐藏的异步操作。缺点是 WaitAsync
方法不支持 CancellationToken
参数,并且由于复杂性,出现错误的概率较高。public class RateLimiter
{
private readonly Queue<long> _queue;
private readonly int _maxActionsPerTimeUnit;
private readonly int _timeUnitMilliseconds;
public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
// Arguments validation omitted
_queue = new Queue<long>();
_maxActionsPerTimeUnit = maxActionsPerTimeUnit;
_timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
}
public Task WaitAsync()
{
int delayMilliseconds = 0;
lock (_queue)
{
long currentTimestamp = Environment.TickCount64;
while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
{
_queue.Dequeue();
}
if (_queue.Count >= _maxActionsPerTimeUnit)
{
long refTimestamp = _queue
.Skip(_queue.Count - _maxActionsPerTimeUnit).First();
delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
Debug.Assert(delayMilliseconds >= 0);
if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
}
_queue.Enqueue(currentTimestamp + delayMilliseconds
+ _timeUnitMilliseconds);
}
if (delayMilliseconds == 0) return Task.CompletedTask;
return Task.Delay(delayMilliseconds);
}
}