如何批量处理 ChannelReader<T>,在消费和处理任何单个项目之间强制执行最大间隔策略?

问题描述 投票:0回答:1

我在生产者-消费者场景中使用

Channel<T>
,并且我需要以每批10个项目的方式消耗通道,并且不允许任何消耗的项目在缓冲区中保持空闲超过5秒。此持续时间是从通道读取项目与处理包含该项目的批次之间允许的最大延迟。最大延迟策略优先于批量大小策略,因此即使少于 10 个项目也应该处理一个批次,以满足最大延迟要求。

我能够以

ReadAllBatches
 类的 
ChannelReader<T>
扩展方法的形式实现第一个要求:

public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> channelReader, int batchSize)
{
    List<T> buffer = new();
    while (true)
    {
        T item;
        try { item = await channelReader.ReadAsync(); }
        catch (ChannelClosedException) { break; }
        buffer.Add(item);
        if (buffer.Count == batchSize)
        {
            yield return buffer.ToArray();
            buffer.Clear();
        }
    }
    if (buffer.Count > 0) yield return buffer.ToArray();
    await channelReader.Completion; // Propagate possible failure
}

我打算这样使用它:

await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
    Console.WriteLine($"Processing batch of {batch.Length} items");
}

我的问题是: 如何使用额外的

ReadAllBatches<T>
参数来增强我的
TimeSpan timeout
实现,以强制执行上述最大延迟策略,而不需要在我的项目中安装第三方软件包?

重要提示:所请求的实现不应受到此处报告的内存泄漏问题的影响。因此,消耗通道的循环不应导致应用程序使用的内存稳定增加,以防在通道中写入项目的生产者长时间处于空闲状态。

注意:我知道关于批处理IAsyncEnumerable<T>

接口,存在类似的问题,但我对此不感兴趣。我对出于性能原因,直接针对 
ChannelReader<T>
类型的方法。

c# memory-leaks producer-consumer iasyncenumerable system.threading.channels
1个回答
3
投票

下面是由 ">GitHub 上发布的想法的实现。

我有同样的“泄漏”问题并通过以下方式解决:

  • 第一次读取使用主令牌(如果我没有要处理的物品,只需等待一个到达)
  • 所有剩余项目必须在 x 毫秒内读取完毕

这样我就永远不会因为超时取消令牌而得到空读取(好吧,当应用程序关闭时可能会得到一个空读取),并且当项目从通道的编写器到达时我会得到正确的行为。

内部

CancellationTokenSource
使用计时器安排取消,在消耗批次中的第一个元素后立即取消。

/// <summary>
/// Reads all of the data from the channel in batches, enforcing a maximum
/// interval policy between consuming an item and emitting it in a batch.
/// </summary>
public static IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> source, int batchSize, TimeSpan timeSpan)
{
    ArgumentNullException.ThrowIfNull(source);
    if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
    if (timeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    return Implementation();

    async IAsyncEnumerable<T[]> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        CancellationTokenSource timerCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        try
        {
            List<T> buffer = new();
            while (true)
            {
                CancellationToken token = buffer.Count == 0 ?
                    cancellationToken : timerCts.Token;
                (T Value, bool HasValue) item;
                try
                {
                    item = (await source.ReadAsync(token).ConfigureAwait(false), true);
                }
                catch (ChannelClosedException) { break; }
                catch (OperationCanceledException)
                {
                    if (cancellationToken.IsCancellationRequested) break;
                    // Timeout occurred.
                    Debug.Assert(timerCts.IsCancellationRequested);
                    Debug.Assert(buffer.Count > 0);
                    item = default;
                }
                if (buffer.Count == 0) timerCts.CancelAfter(timeSpan);
                if (item.HasValue)
                {
                    buffer.Add(item.Value);
                    if (buffer.Count < batchSize) continue;
                }
                yield return buffer.ToArray();
                buffer.Clear();
                if (!timerCts.TryReset())
                {
                    timerCts.Dispose();
                    timerCts = CancellationTokenSource
                        .CreateLinkedTokenSource(cancellationToken);
                }
            }
            // Emit what's left before throwing exceptions.
            if (buffer.Count > 0) yield return buffer.ToArray();

            cancellationToken.ThrowIfCancellationRequested();

            // Propagate possible failure of the channel.
            if (source.Completion.IsCompleted)
                await source.Completion.ConfigureAwait(false);
        }
        finally { timerCts.Dispose(); }
    }
}

使用示例:

await foreach (Item[] batch in myChannel.Reader
    .ReadAllBatches(10, TimeSpan.FromSeconds(5)))
{
    Console.WriteLine($"Processing batch of {batch.Length} items");
}

此实现是非破坏性的,这意味着从通道中消耗的物品不会有丢失的危险。如果枚举被取消或通道出现故障,任何消耗的项目都将在错误传播之前在最后一批中发出。

注意: 如果来源

ChannelReader<T>
在取消
cancellationToken
的同时完成,则取消优先于完成。这与
ChannelReader<T>
ChannelWriter<T>
类的所有本机方法的行为相同。这意味着即使所有工作都已完成,也有可能(尽管很少见)抛出
OperationCanceledException

© www.soinside.com 2019 - 2024. All rights reserved.