生产者消费者集合,具有读取和写入批量数据的能力

问题描述 投票:0回答:2

我正在寻找像BufferBlock这样的系列

但用以下方法:

SendAsync<T>(T[])
T[] ReceiveAsync<T>()

有人可以帮忙吗?

c# multithreading task-parallel-library producer-consumer
2个回答
1
投票

这些方法不可用,SendAsync<T>只需要一个TRecieveAsync<T>只返回一个T,而不是数组。

SendAsync<T>(T[])
T[] ReceiveAsync<T>()

然而,有TryReceiveAll<T>(out IList<T> items),您可以在循环中调用SendAsync<T>将数组发送到BufferBlock或编写自己的扩展方法,如下所示:

public static async Task SendAllAsync<T>(this ITargetBlock<T> block, IEnumerable<T> items)
{
    foreach(var item in items)
    {
         await block.SendAsync(item)
    }
}

请注意,SendAsync确实返回一个表示接受消息的bool,你可以返回一个布尔数组,或者如果它们中的任何一个返回false则返回,但这取决于你。

可能会更容易使用BatchBlock<T>,您可以使用循环将项目作为单个项目发送,但批量发出项目,如果您正在构建管道,这将比使用TryRecieveAll更容易。 BatchBlock WalkthroughBatchBlock Example


0
投票

ReceiveAsyncSendAsync可用作ISourceBlock和ITargetBlockT <>接口的扩展方法。这意味着您必须将块转换为这些接口才能使用扩展方法,例如:

var buffer=new BufferBlock<string>();

var source=(ISourceBlock<string>)buffer;
var target=(ITargetBlock<string>)buffer;

await target.SendAsync("something");

通常这不是问题,因为所有Dataflow方法都接受接口,而不是具体类型,例如:

async Task MyProducer(ITargetBlock<string> target)
{
    ...
    await target.SendAsync(..);
    ...
    target.Complete();
}

async Task MyConsumer(ISourceBlock<string> target)
{
    ...
    var message=await target.ReceiveAsync();
    ...
}

public static async Task Main()
{
    var buffer=new BufferBlock<string>();
    MyProducer(buffer);
    await MyConsumer(buffer);
}
© www.soinside.com 2019 - 2024. All rights reserved.