假设我有一个许多生产者,1个消费者未绑定的频道,有一个消费者:
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
}
问题在于
consume
函数进行一些 IO 访问,也可能进行一些网络访问,因此在使用 1 条消息之前可能会产生更多消息。但是由于不能并发访问IO资源,我不能有很多消费者,也不能把consume
函数丢到一个Task里就不管了
consume
函数可以轻松修改以获取多条消息并批量处理它们。所以我的问题是,是否有一种方法可以让消费者在尝试访问通道队列时获取通道队列中的all消息,如下所示:
while (true) {
Message[] messages = await channel.Reader.TakeAll();
await consumeAll(messages);
}
编辑:我能想到的 1 个选项是:
List<Message> messages = new();
await foreach (var message in channel.Reader.ReadAllAsync(cts.Token))
{
await consume(message);
Message msg;
while (channel.Reader.TryRead(out msg))
messages.Add(msg);
if (messages.Count > 0)
{
await consumeAll(messages);
messages.Clear();
}
}
但我觉得这应该是更好的方法。
在阅读 Stephen Toub 的频道入门书 后,我开始尝试编写一个扩展方法来满足您的需求(我已经有一段时间没有使用 C# 了,所以这很有趣)。
public static class ChannelReaderEx
{
public static async IAsyncEnumerable<IEnumerable<T>> ReadBatchesAsync<T>(
this ChannelReader<T> reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
yield return reader.Flush().ToList();
}
}
public static IEnumerable<T> Flush<T>(this ChannelReader<T> reader)
{
while (reader.TryRead(out T item))
{
yield return item;
}
}
}
可以这样使用:
await foreach (var batch in channel.Reader.ReadBatchesAsync())
{
await ConsumeBatch(batch);
}
在
ChannelReader<T>
层面上解决这个问题,就像优秀的spender's answer一样,实用且足够,但在IAsyncEnumerable<T>
层面上解决它可能是一个应用范围更广的解决方案。下面是一个用于异步序列的扩展方法BufferImmediate
,它产生非空缓冲区,其中包含在拉取序列时立即可用的所有元素:
/// <summary>
/// Splits the sequence into chunks that contain all the elements that are
/// immediately available.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> BufferImmediate<TSource>(
this IAsyncEnumerable<TSource> source, int maxSize = -1)
{
ArgumentNullException.ThrowIfNull(source);
if (maxSize == -1) maxSize = Array.MaxLength;
if (maxSize < 1) throw new ArgumentOutOfRangeException(nameof(maxSize));
return Implementation();
async IAsyncEnumerable<IList<TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var enumerator = source.GetAsyncEnumerator(linkedCts.Token);
ValueTask<bool> moveNext = default;
try
{
moveNext = enumerator.MoveNextAsync();
List<TSource> buffer = new();
TSource[] ConsumeBuffer()
{
TSource[] array = buffer.ToArray();
buffer.Clear();
return array;
}
ExceptionDispatchInfo error = null;
while (true)
{
if (!moveNext.IsCompleted && buffer.Count > 0)
yield return ConsumeBuffer();
TSource item;
try
{
if (!await moveNext.ConfigureAwait(false)) break;
item = enumerator.Current;
}
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
finally { moveNext = default; } // The ValueTask is consumed.
buffer.Add(item);
if (buffer.Count == maxSize) yield return ConsumeBuffer();
try { moveNext = enumerator.MoveNextAsync(); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
}
if (buffer.Count > 0) yield return ConsumeBuffer();
error?.Throw();
}
finally
{
if (!moveNext.IsCompleted)
{
// The last moveNext must be completed before disposing.
// Cancel the enumerator, for more responsive completion.
// Surface any error through the
// TaskScheduler.UnobservedTaskException event.
// Avoid throwing on DisposeAsync.
try { linkedCts.Cancel(); }
catch (Exception ex) { _ = Task.FromException(ex); }
await Task.WhenAny(moveNext.AsTask()).ConfigureAwait(false);
}
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
用法示例:
await foreach (var batch in channel.Reader.ReadAllAsync().BufferImmediate())
{
await ConsumeBatch(batch);
}
上述实现是非破坏性的,这意味着已经从源序列中消耗掉的元素不会丢失。如果源序列失败或枚举被取消,任何缓冲的元素都将在错误传播之前发出。
注意:此实现在整个枚举期间使用相同的
List<TSource>
作为缓冲区。如果在枚举期间的某个时刻缓冲区变得过大,它将保持过大直到枚举结束。如果这是一个问题,您可以在发出每批次后创建一个新的List<TSource>
,或者查看