哈希集的简单并行化

问题描述 投票:0回答:2

我有2个循环(嵌套),试图进行简单的并行化

伪代码

for item1 in data1 (~100 million row)
    for item2 in data2 (~100 rows)
        result = process(item1,item2) // couple of if conditions
        hashset.add(result) // while adding, incase of a duplicate i also decide wihch one to retain

[process(item1,item2)精确地为4,如果条件基于item1和item2中的值。(花费的时间少于50ms)

data1大小为Nx17data2大小为Nx17result大小为1x17(结果添加到字符串中,然后再添加到哈希集中)

最大输出大小:未知,但我想准备至少5亿这意味着该哈希集将容纳5亿个项目。 (我想如何在哈希集中处理这么多数据将是另一个问题)

我应该只使用concurrent hashset使其线程安全并与parallel.each一起使用,还是应该与TASK概念一起使用

请根据您的意见提供一些代码示例。

c# task-parallel-library hashset parallel.foreach
2个回答
2
投票

答案在很大程度上取决于process(data1, data2)的成本。如果这是占用大量CPU的操作,那么您肯定会从Parallel.ForEach中受益。当然,您应该使用并发字典,或者锁定哈希表。您应该进行基准测试,以了解最适合您的方法。如果process对性能的影响太小,那么您可能无法从并行化中得到任何好处-哈希表上的锁定将全部杀死。

您还应尝试查看在外循环上枚举data2是否也更快。它可能会给您带来另一个好处-您可以为data2的每个实例创建一个单独的哈希表,然后将结果合并到一个哈希表中。这样可以避免锁定。

同样,您需要进行测试,此处没有通用答案。


0
投票

我的建议是将数据的处理与结果保存到HashSet分开,因为第一个是可并行化的,而第二个则不是可并行化的。您可以使用BlockingCollection和线程(或任务),使用生产者-消费者模式实现这种分离。但是,我将展示使用更专门的工具BlockingCollection库的解决方案。我假设数据是两个整数数组,并且处理函数最多可以产生500,000,000个不同的结果:

TPL Dataflow

数据流管道将具有两个块。第一个块是var data1 = Enumerable.Range(1, 100_000_000).ToArray(); var data2 = Enumerable.Range(1, 100).ToArray(); static int Process(int item1, int item2) { return unchecked(item1 * item2) % 500_000_000; } ,它接受TransformBlock数组中的一项,用TransformBlock数组中的所有项处理它,并返回一批结果(作为data1数组)。

data2

第二个块是和int,它从第一个块接收已处理的批次,并将各个结果添加到var processBlock = new TransformBlock<int, int[]>(item1 => { int[] batch = new int[data2.Length]; for (int j = 0; j < data2.Length; j++) { batch[j] = Process(item1, data2[j]); } return batch; }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 100, MaxDegreeOfParallelism = 3 // Configurable }); 中。

ActionBlock

下面的行将两个块链接在一起,因此数据将自动从第一个块流向第二个块:

ActionBlock

最后一步是向第一个块提供HashSet数组的项目,并等待整个操作完成。

var results = new HashSet<int>();
var saveBlock = new ActionBlock<int[]>(batch =>
{
    for (int i = 0; i < batch.Length; i++)
    {
        results.Add(batch[i]);
    }
}, new ExecutionDataflowBlockOptions()
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 1 // Mandatory
});

processBlock.LinkTo(saveBlock, new DataflowLinkOptions() { PropagateCompletion = true }); 现在包含结果。

关于使用data1选项的注意事项。此选项控制数据流,以便上游的快速块不会向下游的块填充数据。正确配置此选项可提高内存和CPU效率的管道。

TPL数据流库内置于.NET Core中,并且可作为.NET Framework的for (int i = 0; i < data1.Length; i++) { processBlock.SendAsync(data1[i]).Wait(); } processBlock.Complete(); saveBlock.Completion.Wait(); 使用。

© www.soinside.com 2019 - 2024. All rights reserved.