分区:如何在每个分区后添加等待

问题描述 投票:0回答:1

我有一个API,每分钟接受20个请求,之后,我需要等待1分钟才能查询它。我有一个项目列表(通常超过 1000 个),我需要从 API 查询其详细信息,我的想法是我可以使用

Partitioner
将我的列表划分为 20 个项目/请求,但很快我意识到
Partitioner
不起作用像这样,我的第二个想法是在分区中添加一个
delay
,但这也是一个坏主意,根据我的理解,它在每个不需要的请求之后添加一个延迟,相反,我需要在每个
Partition
之后延迟。下面是我的代码:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

有谁知道我怎样才能做到这一点?

c# parallel-processing task-parallel-library rate-limiting
1个回答
8
投票

这里是一个

RateLimiter
类,您可以使用它来限制异步操作的频率。这是
RateLimiter
类的更简单实现,可以在 this 答案中找到。

/// <summary>
/// Limits the number of workers that can access a resource, during the specified
/// time span.
/// </summary>
public class RateLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        // Schedule the release of the semaphore on the ThreadPool.
        // Ensure that the _timeUnit is valid before the scheduling.
        Task task;
        try { task = Task.Delay(_timeUnit); } catch { _semaphore.Release(); throw; }
        ThreadPool.QueueUserWorkItem(async state =>
        {
            try { await (Task)state; } finally { _semaphore.Release(); }
        }, task);
    }
}

使用示例:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

在线演示.

注意: 此实现存在漏洞,因为它会启动内部异步

Task.Delay
操作,当您使用完
RateLimiter
后无法取消该操作。除了消耗与活动
RateLimiter
任务相关的资源之外,任何挂起的异步操作都会阻止
Task.Delay
及时被垃圾收集。此外,
SemaphoreSlim
也没有按照应有的方式进行处理。这些都是小缺陷,可能不会影响仅创建少量 RateLimiter 的程序。如果您打算创建很多它们,您可以查看此答案的
第三次修订版
,其中包含一次性RateLimiter

这里是
RateLimiter

类的替代实现,更复杂,它基于

Environment.TickCount64
 属性而不是 
SemaphoreSlim
。它的优点是不会在后台启动隐藏的异步操作。缺点是
WaitAsync
方法不支持
CancellationToken
参数,并且由于复杂性,出现错误的概率较高。
public class RateLimiter
{
    private readonly Queue<long> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly int _timeUnitMilliseconds;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _queue = new Queue<long>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnitMilliseconds = checked((int)timeUnit.TotalMilliseconds);
    }

    public Task WaitAsync()
    {
        int delayMilliseconds = 0;
        lock (_queue)
        {
            long currentTimestamp = Environment.TickCount64;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                long refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delayMilliseconds = checked((int)(refTimestamp - currentTimestamp));
                Debug.Assert(delayMilliseconds >= 0);
                if (delayMilliseconds < 0) delayMilliseconds = 0; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delayMilliseconds
                + _timeUnitMilliseconds);
        }
        if (delayMilliseconds == 0) return Task.CompletedTask;
        return Task.Delay(delayMilliseconds);
    }
}

© www.soinside.com 2019 - 2024. All rights reserved.