如何在两个变换块都完成后重新编写代码完成的代码?我以为完成意味着将其标记为完成并且“出队列”为空?
public Test()
{
broadCastBlock = new BroadcastBlock<int>(i =>
{
return i;
});
transformBlock1 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("1 input count: " + transformBlock1.InputCount);
Thread.Sleep(50);
return ("1_" + i);
});
transformBlock2 = new TransformBlock<int, string>(i =>
{
Console.WriteLine("2 input count: " + transformBlock1.InputCount);
Thread.Sleep(20);
return ("2_" + i);
});
processorBlock = new ActionBlock<string>(i =>
{
Console.WriteLine(i);
});
//Linking
broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}
public void Start()
{
const int numElements = 100;
for (int i = 1; i <= numElements; i++)
{
broadCastBlock.SendAsync(i);
}
//mark completion
broadCastBlock.Complete();
processorBlock.Completion.Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
我编辑了代码,为每个变换块添加了输入缓冲区计数。显然,所有100个项目都流式传输到每个转换块。但是,一旦其中一个变换块完成,处理器块就不再接受任何其他项,而是不完整变换块的输入缓冲区仅刷新了输入缓冲区。
问题正是casperOne在回答中所说的。一旦第一个转换块完成,处理器块便进入“完成模式”:它将处理其输入队列中的剩余项目,但将不接受任何新项目。
虽然有一个比将处理器块一分为二更简单的解决方法:不设置PropagateCompletion
,而是在两个转换块都完成时手动设置处理器块的完成:
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
.ContinueWith(_ => processorBlock.Complete());
这里的问题是,每次调用PropagateCompletion
property来链接这些块时,都要设置PropagateCompletion
,并且转换块中的等待时间也不同。
摘自LinkTo
method上的LinkTo
的文档(重点是我的:]
它向IDataflowBlock发送的信号不应接受也不产生任何其他消息也不要消耗任何其他已延迟的消息。
由于在每个Complete
method实例中错开了等待时间,所以Complete
(等待20 ms)在IDataflowBlock
interface(等待50 ms)之前完成。 IDataflowBlock
首先完成,然后将信号发送到TransformBlock<TInput, TOutput>
,然后说“我不接受其他任何东西”(并且TransformBlock<TInput, TOutput>
尚未产生所有消息)。
请注意,transformBlock2
之前的transformBlock1
的处理不能绝对;线程池(假设您使用的是默认调度程序)将以不同的顺序处理任务是可行的(但很有可能不会,因为一旦完成20 ms的工作,它将从队列中窃取工作)。] >
您的管道如下所示:
transformBlock2
为了解决这个问题,您想要一个看起来像这样的管道:
processorBlock
仅通过创建两个单独的
transformBlock1
实例即可完成,如下所示:
transformBlock1
然后您需要等待两个处理器模块,而不是仅一个:
重要说明;创建transformBlock1
这里非常
broadcastBlock
/ \
transformBlock1 transformBlock2
\ /
processorBlock
时,默认值是将传递给它的 broadcastBlock
/ \
transformBlock1 transformBlock2
| |
processorBlock1 processorBlock2
实例上的ActionBlock<TInput>
设置为1。这意味着您传递给ActionBlock<TInput>
的// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);
// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);
// Linking
broadCastBlock.LinkTo(transformBlock1,
new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1,
new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2,
new DataflowLinkOptions { PropagateCompletion = true });
的调用是线程安全的,一次只能执行一个。
因为现在有two
个Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();
实例指向同一个ActionBlock<TInput>
委托,所以不能保证线程安全。如果您的方法是线程安全的,则您无需执行任何操作(由于没有理由阻止,因此您可以将MaxDegreeOfParallelism
property属性设置为MaxDegreeOfParallelism
。
如果它是not
线程安全的,并且需要保证它是安全的,则需要使用传统的同步原语,例如ExecutionDataflowBlockOptions
。在这种情况下,您可以这样做(尽管显然不需要,因为ExecutionDataflowBlockOptions
上的Action<T>
delegate是线程安全的:]
Action<T>
svick的回答的补充:为了与通过PropagateCompletion选项获得的行为保持一致,如果前一个块发生故障,还需要转发异常。像下面这样的扩展方法也可以解决这个问题:
其他答案非常清楚,当一个块具有两个以上的源时,为什么PropagateCompletion = true会使事情搞砸了。
[此方法在功能上等同于pkt的lock
statement方法,但代码略少: