我有2个循环(嵌套),试图进行简单的并行化
伪代码:
for item1 in data1 (~100 million row)
for item2 in data2 (~100 rows)
result = process(item1,item2) // couple of if conditions
hashset.add(result) // while adding, incase of a duplicate i also decide wihch one to retain
[process(item1,item2)
精确地为4,如果条件基于item1和item2中的值。(花费的时间少于50ms)
data1
大小为Nx17data2
大小为Nx17result
大小为1x17(结果添加到字符串中,然后再添加到哈希集中)
最大输出大小:未知,但我想准备至少5亿这意味着该哈希集将容纳5亿个项目。 (我想如何在哈希集中处理这么多数据将是另一个问题)
我应该只使用concurrent hashset
使其线程安全并与parallel.each
一起使用,还是应该与TASK
概念一起使用
请根据您的意见提供一些代码示例。
答案在很大程度上取决于process(data1, data2)
的成本。如果这是占用大量CPU的操作,那么您肯定会从Parallel.ForEach
中受益。当然,您应该使用并发字典,或者锁定哈希表。您应该进行基准测试,以了解最适合您的方法。如果process
对性能的影响太小,那么您可能无法从并行化中得到任何好处-哈希表上的锁定将全部杀死。
您还应尝试查看在外循环上枚举data2是否也更快。它可能会给您带来另一个好处-您可以为data2的每个实例创建一个单独的哈希表,然后将结果合并到一个哈希表中。这样可以避免锁定。
同样,您需要进行测试,此处没有通用答案。
我的建议是将数据的处理与结果保存到HashSet
分开,因为第一个是可并行化的,而第二个则不是可并行化的。您可以使用BlockingCollection
和线程(或任务),使用生产者-消费者模式实现这种分离。但是,我将展示使用更专门的工具BlockingCollection
库的解决方案。我假设数据是两个整数数组,并且处理函数最多可以产生500,000,000个不同的结果:
TPL Dataflow
数据流管道将具有两个块。第一个块是var data1 = Enumerable.Range(1, 100_000_000).ToArray();
var data2 = Enumerable.Range(1, 100).ToArray();
static int Process(int item1, int item2)
{
return unchecked(item1 * item2) % 500_000_000;
}
,它接受TransformBlock
数组中的一项,用TransformBlock
数组中的所有项处理它,并返回一批结果(作为data1
数组)。
data2
第二个块是和int
,它从第一个块接收已处理的批次,并将各个结果添加到var processBlock = new TransformBlock<int, int[]>(item1 =>
{
int[] batch = new int[data2.Length];
for (int j = 0; j < data2.Length; j++)
{
batch[j] = Process(item1, data2[j]);
}
return batch;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 3 // Configurable
});
中。
ActionBlock
下面的行将两个块链接在一起,因此数据将自动从第一个块流向第二个块:
ActionBlock
最后一步是向第一个块提供HashSet
数组的项目,并等待整个操作完成。
var results = new HashSet<int>();
var saveBlock = new ActionBlock<int[]>(batch =>
{
for (int i = 0; i < batch.Length; i++)
{
results.Add(batch[i]);
}
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 1 // Mandatory
});
processBlock.LinkTo(saveBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
现在包含结果。
关于使用data1
选项的注意事项。此选项控制数据流,以便上游的快速块不会向下游的块填充数据。正确配置此选项可提高内存和CPU效率的管道。
TPL数据流库内置于.NET Core中,并且可作为.NET Framework的for (int i = 0; i < data1.Length; i++)
{
processBlock.SendAsync(data1[i]).Wait();
}
processBlock.Complete();
saveBlock.Completion.Wait();
使用。