具有有限容量的变换块中的TPL数据流异常

问题描述 投票:9回答:1

我需要构建将处理许多消息的TPL数据流管道。因为有很多消息,所以我不能简单地将它们Post放入BufferBlock的无限队列中,否则我将面临内存问题。因此,我想使用BoundedCapacity = 1选项禁用队列,并使用MaxDegreeOfParallelism使用并行任务处理,因为我的TransformBlock可能需要花费一些时间来处理每条消息。我还使用PropagateCompletion完成所有操作,并且无法沿管道传播。

但是当第一条消息刚发生错误时,我正面临错误处理的问题:调用await SendAsync只是将我的应用切换为无限等待。

我已将案例简化为示例控制台应用程序:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1
});

var process_block = new ActionBlock<int>(x =>
{
    throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
});

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });

for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: {0}", k);
}

data_buffer.Complete();

await process_block.Completion;
c# task-parallel-library tpl-dataflow
1个回答
7
投票

这是预期的行为。如果发生“下游”故障,则该错误不会在网格上“向后”传播。网格期望您检测到该故障(例如,通过process_block.Completion)并予以解决。

如果要向后传播错误,则可以在下游块故障的情况下,在上游块await上加一个process_block.Completion或在faults上继续。

请注意,这不是唯一可行的解​​决方案;您可能要重建网格的该部分或将源链接到替代目标。源块没有故障,因此它们可以继续使用已修复的网格进行处理。

© www.soinside.com 2019 - 2024. All rights reserved.