这是TPL Dataflow的工作吗?

问题描述 投票:11回答:2

我在不同的任务上运行了非常典型的生产者/消费者模型。

Task1:从二进制文件中读取一批byte [],并为每个字节数组集合启动一个新任务。 (该操作是批处理的,用于内存管理)。

任务2-n:这些是辅助任务,每个任务都对传入的字节数组集合(来自Tasks1)进行操作,并对字节数组进行反序列化,然后按特定条件对它们进行排序,然后存储结果对象的集合(每个对象并发字典中将字节数组反序列化为此类对象。

任务(n + 1),我选择了并发字典,因为此任务的工作是按照与它们从Task1产生的顺序相同的顺序合并存储在并发字典中的那些集合。我通过将collectionID(它的类型为int,并对于Task1中的每个新集合递增)来实现这一点,从Task1一直向下传递到此任务。此任务基本上检查下一个预期的collectionID是否已存储在并发字典中,如果是,则将其取出,将其添加到Final Queue中,并检查并发字典中的下一个集合。

现在,从我阅读的内容和观看的视频中,我看来,TPL Dataflow可能是这种生产者/消费者模型的理想人选。因为我从未使用过TPL Dataflow,所以我似乎无法设计出一个这样的入门书。在吞吐量和延迟方面,该库是否可以胜任任务?我目前处理250万个字节数组,因此每秒处理结果集合中的对象。 TPL数据流可以帮助简化吗?我对以下问题的答案特别感兴趣:当产生工作任务并在工作任务完成工作后重新合并它们时,TPL Dataflow是否可以保留Task1的收集批处理顺序?它优化了事情吗?概述了整个结构后,我感到由于旋转和涉及太多并发集合而浪费了很多时间。

有什么想法,想法吗?

c# concurrency task-parallel-library producer-consumer tpl-dataflow
2个回答
11
投票

EDIT:原来我错了。TransformBlockdoes会以进来的顺序返回项目,即使已将其配置为并行处理也是如此。因此,我原始答案中的代码完全没有用,可以使用普通的TransformBlock代替。


原始答案:

据我所知,.Net中只有一个并行结构支持按处理顺序返回处理过的项:带有AsOrdered()的PLINQ。但是在我看来,PLINQ与您想要的不匹配。

TPL数据流,另一方面,我认为很合适,但是它没有一个支持并行性并同时按顺序返回项目的块(AsOrdered()支持这两者,但不支持同时)。幸运的是,数据流模块在设计时就考虑到了可组合性,因此我们可以构建自己的模块来做到这一点。

但是首先,我们必须弄清楚如何对结果进行排序。像您建议的那样,使用并发字典以及一些同步机制肯定可以。但是我认为有一个更简单的解决方案:使用TransformBlock队列。在输出任务中,将Task出队,等待(异步)完成,然后将其结果发送出去。对于队列为空的情况,我们仍然需要一些同步,但是如果我们选择巧妙地使用哪个队列,我们​​可以免费获得同步。

所以,一般的想法是这样的:我们正在写的将是Task,带有一些输入和一些输出。创建自定义IPropagatorBlock的最简单方法是创建一个处理输入的块,另一个生成结果的块,并使用IPropagatorBlock将其视为一个。

输入块将必须以正确的顺序处理传入的项目,因此在那里没有并行化。它将创建一个新的DataflowBlock.Encapsulate()(实际上是一个DataflowBlock.Encapsulate(),以便我们稍后可以设置Task的结果),将其添加到队列中,然后将项目发送给处理,以及一些设置方法正确TaskCompletionSource的结果。因为我们不需要将此块链接到任何东西,所以我们可以使用TaskCompletionSource

输出块将必须从队列中获取Task,异步等待它们,然后将它们发送出去。但是,由于所有块中都嵌入了队列,并且接受委托的块具有内置的异步等待,因此这非常简单:Task。该块既可以用作队列,也可以用作输出块。因此,我们不必处理任何同步。

难题的最后一步实际上是并行处理项目。为此,我们可以使用另一个ActionBlock,这次设置为Task。它将接受输入,对其进行处理,并在队列中设置正确的new TransformBlock<Task<TOutput>, TOutput>(t => t)的结果。

放在一起,看起来可能像这样:

ActionBlock

经过如此多的讨论,我认为这是相当少的代码。

似乎您非常在乎性能,因此您可能需要微调此代码。例如,将MaxDegreeOfParallelism块的Task设置为public static IPropagatorBlock<TInput, TOutput> CreateConcurrentOrderedTransformBlock<TInput, TOutput>( Func<TInput, TOutput> transform) { var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>( tuple => tuple.Item2(transform(tuple.Item1)), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded }); var enqueuer = new ActionBlock<TInput>( async item => { var tcs = new TaskCompletionSource<TOutput>(); await processor.SendAsync( new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); await queue.SendAsync(tcs.Task); }); enqueuer.Completion.ContinueWith( _ => { queue.Complete(); processor.Complete(); }); return DataflowBlock.Encapsulate(enqueuer, queue); } 之类可能是有道理的,以避免过度订阅。另外,如果延迟对您来说比吞吐量更重要,则可以将同一块的MaxDegreeOfParallelism设置为1(或另一个小数字),这样,当项目处理完成时,它会立即发送到输出。

此外,如果要限制传入的项目,则可以设置processorEnvironment.ProcessorCount


0
投票

是,Environment.ProcessorCount库非常适合此工作。它支持您需要的所有功能:MaxMessagesPerTaskBoundedCapacityenqueuer。但是使用TPL Dataflow选项需要注意一些细节。

首先,必须确保使用MaxDegreeOfParallelism方法在管道中填充第一个块。否则,如果使用MaxDegreeOfParallelism方法并忽略其返回值,则可能会丢失消息。 BoundedCapacity永远不会丢失消息,因为它异步阻塞了调用者,直到块的内部缓冲区中有用于传入消息的可用空间为止。

其次,您必须确保下游块中的可能异常不会无限期地阻塞供料器,等待永远不会出现的可用空间。没有内置的方法可以通过配置块自动完成此操作。相反,您必须手动将下游块的完成传播到上游块。这是下面示例中方法BoundedCapacity的目的:

EnsureOrdered
© www.soinside.com 2019 - 2024. All rights reserved.