无法运行TPL数据流管道。

问题描述 投票:0回答:1

我正试图使用 TPL Dataflow 我可以将消息存储在一个批处理块中,每当它的阈值达到时,它就会将数据发送到一个动作块中.我已经添加了一个缓冲块,以防动作块太慢。到目前为止,我已经尝试了所有可能的方法来将数据从第一个块移动到第二个块,但没有用。我已经链接了这些块,添加了 DataFlowLinkOptionsPropagateCompletion 设为 true. 我还需要做什么才能让这个管道工作?

管线

class LogPipeline<T>
{
    private ActionBlock<T[]> actionBlock;
    private BufferBlock<T> bufferBlock;
    private BatchBlock<T> batchBlock;
    private readonly Action<T[]> action;
    private readonly int BufferSize;
    private readonly int BatchSize;

    public LogPipeline(Action<T[]> action, int bufferSize = 4, int batchSize = 2)
    {
        this.BufferSize = bufferSize;
        this.BatchSize = batchSize;
        this.action = action;
    }
    private void Initialize()
    {
        this.bufferBlock = new BufferBlock<T>(new DataflowBlockOptions
            { TaskScheduler = TaskScheduler.Default,
            BoundedCapacity = this.BufferSize });
        this.actionBlock = new ActionBlock<T[]>(this.action);
        this.batchBlock = new BatchBlock<T>(BatchSize);
        this.bufferBlock.LinkTo(this.batchBlock, new DataflowLinkOptions
            { PropagateCompletion = true });
        this.batchBlock.LinkTo(this.actionBlock, new DataflowLinkOptions
            { PropagateCompletion = true });
    }
    public void Post(T log)
    {
        this.bufferBlock.Post(log);
    }
    public void Start()
    {
        this.Initialize();
    }
    public void Stop()
    {
        actionBlock.Complete();
    }
}

测试

[TestCase(100, 1000, 5)]
public void CanBatchPipelineResults(int batchSize, int bufferSize, int cycles)
{

    List<int> data = new List<int>();
    LogPipeline<int> logPipeline = new LogPipeline<int>(
       batchSize: batchSize,
       bufferSize: bufferSize,
       action: (logs) =>
       {
           data.AddRange(logs);
       });
    logPipeline.Start();

    int SelectWithEffect(int element)
    {
        logPipeline.Post(element);
        return 3;
    }
    int count = 0;
    while (true)
    {
        if (count++ > cycles)
        {
            break;
        }
        var sent = Parallel.For(0, bufferSize, (x) => SelectWithEffect(x));
    }
    logPipeline.Stop();
    Assert.IsTrue(data.Count == cycles * batchSize);
}

为什么除了缓冲区之外,我所有的块都是空的?我试过用 SendAsync 也无济于事。无论我怎么做,数据都不会从第一块移动到下一块。

我有带链接和不带链接的选项。

更新 我已经完全删除了管道,还删除了 Parallel.我尝试了各种输入块(batchbuffertransform),似乎没有办法让后续的块得到什么。我也试过用 await SendAsync 以及 Post. 我只试过内 unit tests 类。会不会是这个问题?

更新2我是错的复杂的东西 , 我已经尝试了一个更简单的例子 . 在一个testcase里面,即使这样也不工作。

List<int> items=new List<int>(); var tf=new TransformBlock<int,int>(x=>x+1); var action= new ActionBlock<int>(x=>items.Add(x)); tf.LinkTo(action, new DataFlowOptions{ PropagateCompletion=true}); tf.Post(3); //Breakpoint here

c# tpl-dataflow .net-core-3.1
1个回答
2
投票

在测试结束前似乎没有任何事情发生,原因是没有一个块有机会运行。代码 所有的CPU通过使用 Parallel.For 所以没有其他任务有机会运行。这意味着所有发布的消息仍然在第一块中。然后代码调用 Complete 的最后一个块,但甚至不等它处理完就检查结果。

这段代码可以简化很多。首先,所有的块都有输入缓冲区,它们不需要额外的缓冲。

管道可以只用这个.Pipeline来代替。

//Arrange
var list=new List<int>();

var head=new BatchBlock<int>(BatchSize);
var act=new ActionBlock<int[]>(nums=>list.AddRange(nums);

var options= new DataflowLinkOptions{ PropagateCompletion = true };
head.LinkTo(act);

//ACT

//Just fire everything at once, because why not
var tasks=Enumerable.Range(0,cycles)(
    i=>Task.Run(()=> head.Post(i)));
await tasks;

//Tell the head block we're done
head.Complete();
//Wait for the last block to complete
await act.Completion;

//ASSERT
Assert.Equal(cycles, data.Count);

没有必要创建一个复杂的类来封装流水线。它不会 "启动"--如果没有数据,块就什么都不做。为了抽象它,我们只需要提供对头部块和最后一个块的访问。Completion 任务


1
投票

通过调用 logPipeline.Stop 的数据后,立即向 BufferBlock,你正在完成 ActionBlock因此,它拒绝了所有的信息。BatchBlock 是想以后发给它。从文档中的 ActionBlock.Complete 方法,向数据流块发出信号,表明它不应该接受或产生更多的消息,也不应该消耗更多的延迟消息。

向数据流块发出信号,表明它不应该再接受或产生任何消息,也不应该再消耗任何推迟的消息。


更新了。 关于问题中的更新要求:

每当它的阈值被击中,它就会把数据发送到一个动作块。

....我的建议是将这个逻辑移入到 LogPipeline.Post 方法。该方法 BufferBlock.Post 返回 false 如果该块还没有接受发送到它的数据。

public void Post(T log)
{
    if (!this.bufferBlock.Post(log)) this.actionBlock.Post(log);
}
© www.soinside.com 2019 - 2024. All rights reserved.