通过BufferBlock的反压不工作。(C# TPL Dataflow)

问题描述 投票:0回答:1

典型情况。生产者快,消费者慢,需要让生产者慢下来. 不像我所期望的那样工作的示例代码(下面解释)。

//  I assumed this block will behave like BlockingCollection, but it doesn't 
var bb = new BufferBlock<int>(new DataflowBlockOptions {
    BoundedCapacity = 3, // looks like this does nothing
});

// fast producer
int dataSource = -1;
var producer = Task.Run(() => {
    while (dataSource < 10) {
        var message = ++dataSource;
        bb.Post(message);
        Console.WriteLine($"Posted: {message}");
    }
    Console.WriteLine("Calling .Complete() on buffer block");
    bb.Complete();
});

// slow consumer
var ab = new ActionBlock<int>(i => {
    Thread.Sleep(500);
    Console.WriteLine($"Received: {i}");
}, new ExecutionDataflowBlockOptions {
    MaxDegreeOfParallelism = 2,
});

bb.LinkTo(ab);

ab.Completion.Wait();

我以为这段代码可以正常工作,但它并没有工作。

  • BufferBlock bb 这是一个容量为3的阻塞队列。 一旦容量达到,生产者不应该能够。.Post() 到它,直到有一个空位。
    • 不是这样的。bb 似乎很乐意接受任何数量的消息。
  • producer 是一个快速发布消息的任务。一旦所有的消息都被发布,就会被调用到 bb.Complete() 应该在管道中传播,一旦所有消息处理完毕,就会发出关闭信号。因此,等待 ab.Completion.Wait(); 在最后。
    • 也不行。只要 .Complete() 被调用,动作块 ab 不会再收到任何信息。

可以用 BlockingCollection我认为在TPL数据流(TDF)的世界中 BufferBlock 是相当于。我想我误解了TPL Dataflow中的背压是如何工作的。

那么问题出在哪里呢?如何运行这个管道,不允许缓冲区内的消息超过3条。bb,并等待其完成?

PS:我发现这个要点(https:/gist.github.commnadeldf2ec09fe7eae9ba8938。),在这里建议保持一个旗语来阻止向 BufferBlock. 我以为这是 "内置 "的。

接受回答后的更新。

接受答案后更新。

如果你在看这道题的时候,你需要记住的是 ActionBlock 也有自己的输入缓冲区。

这是一个人的。然后你还需要意识到,因为所有的块都有自己的输入缓冲区,所以你不需要在这里使用 BufferBlock 因为你可能认为它的名字意味着什么。A BufferBlock 更像是一个实用块,用于更复杂的架构,或者像一个平衡加载块。但它不是一个背压缓冲器。

完成传播需要在链路级别明确地定义。

当调用 .LinkTo() 需明传 new DataflowLinkOptions {PropagateCompletion = true} 作为第2个论点。

c# task-parallel-library dataflow tpl-dataflow
1个回答
5
投票

要引入背压,你需要使用 SendAsync 当您将物品发送到区块中时。这让你的制作人可以等待区块准备好物品。你要找的就是这样的东西。

class Program
{
    static async Task Main()
    {
        var options = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 3
        };
        var block = new ActionBlock<int>(async i =>
        {
            await Task.Delay(100);
            Console.WriteLine(i);
        }, options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            await block.SendAsync(i);
        }

        block.Complete();
        await block.Completion;
    }
}

如果你把这个改成 Post 的结果,并打印出 Post 你会发现很多项目都无法传递到块中。

class Program
{
    static async Task Main()
    {
        var options = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 1
        };
        var block = new ActionBlock<int>(async i =>
        {
            await Task.Delay(1000);
            Console.WriteLine(i);
        }, options);

        //Producer
        foreach (var i in Enumerable.Range(0, 10))
        {
            var result = block.Post(i);
            Console.WriteLine(result);
        }

        block.Complete();
        await block.Completion;
    }
}

Output:

True
False
False
False
False
False
False
False
False
False
0

1
投票

在JSteward答案的指导下,我想出了下面的代码,它在处理这些项目的同时产生(读取等)新的项目,保持一个读前缓冲区,当 "生产者 "没有更多的项目时,完成信号会被发送到链的头部,程序也在等待整个链的完成,然后终止。

static async Task Main() {

    string Time() => $"{DateTime.Now:hh:mm:ss.fff}";

    // the buffer is added to the chain just for demonstration purposes
    // the chain would work fine using just the built-in input buffer
    // of the `action` block.
    var buffer = new BufferBlock<int>(new DataflowBlockOptions {BoundedCapacity = 3});

    var action = new ActionBlock<int>(async i =>
    {
        Console.WriteLine($"[{Time()}]: Processing: {i}");
        await Task.Delay(500);
    }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 2, BoundedCapacity = 2});

    // it's necessary to set `PropagateCompletion` property
    buffer.LinkTo(action, new DataflowLinkOptions {PropagateCompletion = true});

    //Producer
    foreach (var i in Enumerable.Range(0, 10))
    {
        Console.WriteLine($"[{Time()}]: Ready to send: {i}");
        await buffer.SendAsync(i);
        Console.WriteLine($"[{Time()}]: Sent: {i}");
    }

    // we call `.Complete()` on the head of the chain and it's propagated forward
    buffer.Complete(); 

    await action.Completion;
}
© www.soinside.com 2019 - 2024. All rights reserved.