在同一查询中使用AsParallel()和AsSequential()的影响? C#

问题描述 投票:1回答:1

我读过其中一本书中的PLINQ,它说:

如果您有可以从并行处理中受益的复杂查询,但也有一些应按顺序完成的部分,您可以使用AsSequential可以阻止并行处理您的查询。

例如:

var parallelResult = numbers.AsParallel().AsOrdered()
    .Where(i => i % 2 == 0).AsSequential();

我想了解为什么允许这样做,对结果有什么影响?它是并行运行的吗?它按顺序运行吗?现在没有任何意义。

c# .net visual-studio linq plinq
1个回答
0
投票

您可以将LINQ查询概念化为具有单个执行计划的原子构造,但是将概念化是由多个数据流块组成的管道可能更有用。每个块的输出将成为数据流中下一个块的输入,并且一旦这些块可用,这些块就会同时处理它们。以下一个查询为例,该查询由两个Select运算符表示的两个“块”组成。第一块配置为一次(并行)处理3个项目,第二块配置为顺序处理每个项目。对于并行块,每个项目的处理持续时间为1000毫秒,对于顺序块,每个项目的处理持续时间为500毫秒:

var results = Enumerable.Range(1, 10)
    .AsParallel()
    .AsOrdered()
    .WithDegreeOfParallelism(3)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
        Thread.Sleep(1000); // Simulate some CPU-bound work
        return x;
    })
    .AsSequential()
    .Select(x =>
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
            + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
        Thread.Sleep(500); // Simulate some CPU-bound work
        return x;
    })
    .ToArray();
Console.WriteLine($"Results: {String.Join(", ", results)}");

如果运行此代码,您将得到如下输出:

08:32:17.628 [4] Parallel #2
08:32:17.628 [5] Parallel #1
08:32:17.628 [6] Parallel #3
08:32:18.642 [6] Parallel #5
08:32:18.642 [5] Parallel #4
08:32:18.644 [4] Parallel #6
08:32:18.651 [1] Sequential #1
08:32:19.644 [6] Parallel #7
08:32:19.645 [4] Parallel #8
08:32:19.646 [5] Parallel #9
08:32:19.654 [1] Sequential #2
08:32:20.156 [1] Sequential #3
08:32:20.648 [4] Parallel #10
08:32:20.658 [1] Sequential #4
08:32:21.161 [1] Sequential #5
08:32:21.663 [1] Sequential #6
08:32:22.164 [1] Sequential #7
08:32:22.672 [1] Sequential #8
08:32:23.173 [1] Sequential #9
08:32:23.675 [1] Sequential #10
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10

请注意,在完成所有并行处理之前如何开始顺序处理。为了达到这种效果,我使用了配置选项WithMergeOptions(ParallelMergeOptions.NotBuffered),以最小化第一个块的输出缓冲。在此处查找其他选项:ParallelMergeOptions

为了完整起见,请使用ParallelMergeOptions库重写此查询。代码变得更加冗长,流利程度降低,但是执行控制变得更加精确,并且异步工作流也变得可用(PLINQ不异步友好):

TPL Dataflow

输出:

var block1 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Parallel #{x}");
    await Task.Delay(1000); // Simulate some I/O operation
    return x;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 3,
    EnsureOrdered = true // redundant since EnsureOrdered is the default
});

var block2 = new TransformBlock<int, int>(async x =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff}"
        + $" [{Thread.CurrentThread.ManagedThreadId}] Sequential #{x}");
    await Task.Delay(500); // Simulate some I/O operation
    return x;
}); // MaxDegreeOfParallelism = 1 is the default

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });

// Feeding the first block
foreach (var x in Enumerable.Range(1, 10))
{
    await block1.SendAsync(x);
}
block1.Complete();

var results = new List<int>(); // Collecting the results is a bit painful
while (await block2.OutputAvailableAsync())
{
    while (block2.TryReceive(out var result))
    {
        results.Add(result);
    }
}
await block2.Completion;
Console.WriteLine($"Results: {String.Join(", ", results)}");
© www.soinside.com 2019 - 2024. All rights reserved.