TPL数据流C#等待所有链接块完成。

问题描述 投票:1回答:1

我正在使用TPL Dataflow建立一个管道。这个流水线在逻辑上应该做到以下几点。

  1. 从处理多个数据项开始--比如说... pollingBlock.
  2. 在满足某些条件的情况下,通过 一个 的项目(符合条件的)到某个区块进行进一步监测,比方说是 monitoringBlock. 每个 monitoringBlock 只能容纳1件物品,但有多个 monitoringBlocks.
  3. pollingBlock 应该继续处理所有的项目,包括一个发布在 while (true) 的方式。
  4. monitoringBlocks 在被占用的情况下,不应该接受任何其他的消息,这些消息应该只是被删除,而不做进一步的处理。
  5. 经过一段时间的处理,在 monitoringBlock 消息应该被标记为已完成或转移到下一个块进行处理,这个下一个块就是 processingBlock

一个简单的样本。

public Task ExecutePipeline()
{
    var block = CreatePollingPipeline();
    block.Post((_serviceOne, _serviceTwo));

    block.Complete();
    return block.Completion;
}

public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
    var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();

    var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
    {
        while (true)
        {
            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP2", "INPVAL2"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP2", "INPVAL2"));
            Thread.Sleep(2000);
        }
    });

    var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
        {
            Console.WriteLine("monitoringBlock started");
            Thread.Sleep(5000);
            Console.WriteLine("monitoringBlock completed");

            return (inputs.input1, inputs.input2);
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

    pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
        inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
    pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());

    var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
    {
        Console.WriteLine("processingBlock started");
        Thread.Sleep(2000);
        Console.WriteLine("processingBlock completed");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
    monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });


    return pollingBlock;
}

我的问题是,我如何保持 monitoringBlock 占用 processingBlock 完成它的工作?我不希望任何项目被发布到... monitoringBlock 訊息還沒說完 处理周期。

c# .net-core async-await task-parallel-library tpl-dataflow
1个回答
1
投票

正如在评论中已经提到的,你可以简单地封装逻辑的 monitoringBlockprocessingBlock 在一个区块中,例如,你可以通过预定义的 Datablock.Encapsulate 方法。

但是,如果你不想这样做,你可以使用 AutoResetEvent 或类似的抽象,你的代码可以是这样的。

AutoResetEvent dataflowEvent = new AutoResetEvent(true);
var bufferBlock = new ActionBLock<(string input1, string input2)>(i =>
{
    dataflowEvent.WaitOne();
    monitoringBlock.Post(i);
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
    {
        Console.WriteLine("monitoringBlock started");
        Thread.Sleep(5000);
        Console.WriteLine("monitoringBlock completed");

        dataflowEvent.Set();
        return (inputs.input1, inputs.input2);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
© www.soinside.com 2019 - 2024. All rights reserved.