TPL数据流加速?

问题描述 投票:5回答:4

我想知道以下代码是否可以优化以更快地执行。目前,在一个非常简单的数据流结构上,我目前似乎每秒最多可以收到140万条简单消息。我知道此示例过程同步地传递/转换消息,但是,我目前正在测试TPL Dataflow,以替代基于任务和并发集合的自定义解决方案。我知道“并发”一词已经表明我可以并行运行事物,但是出于当前测试的目的,我通过同步将消息推送到自己的解决方案上,每秒获得约510万条消息。我在这里所缺少的是,我读到TPL Dataflow被推为高吞吐量,低延迟的解决方案,但是到目前为止,我必须忽略性能调整。有人能指出我正确的方向吗?

class TPLDataFlowExperiments
{
    public TPLDataFlowExperiments()
    {
        var buf1 = new BufferBlock<int>();

        var transform = new TransformBlock<int, string>(t =>
            {
                return "";
            });

        var action = new ActionBlock<string>(s =>
            {
                //Thread.Sleep(100);
                //Console.WriteLine(s);
            });

        buf1.LinkTo(transform);
        transform.LinkTo(action);

        //Propagate all Completions down the flow
        buf1.Completion.ContinueWith(t =>
        {
            transform.Complete();
            transform.Completion.ContinueWith(u =>
            {
                action.Complete();
            });
        });

        Stopwatch watch = new Stopwatch();
        watch.Start();

        int cap = 10000000;
        for (int i = 0; i < cap; i++)
        {
            buf1.Post(i);
        }

        //Mark Buffer as Complete
        buf1.Complete();

        action.Completion.ContinueWith(t =>
            {
                watch.Stop();

                Console.WriteLine("All Blocks finished processing");
                Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
            });

        Console.ReadLine();
    }
}
c# concurrency task-parallel-library tpl-dataflow
4个回答
9
投票

我认为这主要归结为一件事:您的测试几乎毫无意义。所有这些块都应该做某事,并使用多个内核和异步操作来做到这一点。

而且,在您的测试中,同步上可能花费了大量时间。使用更实际的代码,代码将花费一些时间来执行,因此争用将减少,因此实际开销将小于您所测量的开销。

但是要真正回答您的问题,是的,您忽略了一些性能调整。具体来说,SingleProducerConstrained,意味着可以使用锁定较少的数据结构。如果我在两个模块上都使用了它(SingleProducerConstrained在这里完全没用,您可以放心地删除它),则速率将从每秒约3-4百万个项目提高到计算机上的500万个以上。


2
投票

为了补充svick的答案,测试仅对单个操作块使用单个处理线程。这样,它只测试使用块的开销。

DataFlow以类似于F#代理,Scala actor和MPI实现的方式工作。每个动作块一次执行一个任务,监听输入并产生输出。通过分步执行可在多个内核上独立执行的算法(仅将消息彼此传递)来提供加速。

虽然您可以增加并发任务的数量,但是最重要的问题是设计一种流程,该流程独立于其他步骤执行最大数量的步骤。


0
投票

您还可以增加数据流块的并行度。如果您发现其中一个块成为其余块的瓶颈,则这可能会提供额外的加速,并且还有助于线性任务之间的负载平衡。


0
投票

如果您的工作负载非常细微,以至于您希望每秒处理数百万条消息,那么由于相关的开销,通过管道传递单个消息变得不可行。您需要通过将消息批处理到数组或列表中来分块工作量。例如:

BufferBlock

对于批处理输入,您可以使用var transform = new TransformBlock<int[], string[]>(batch => { var results = new string[batch.Length]; for (int i = 0; i < batch.Length; i++) { results[i] = ProcessItem(batch[i]); } return results; }); 包中的BatchBlock或“ linqy” BatchBlock扩展方法,或使用Buffer包中的功能类似的Buffer方法,也可以使用手动。

© www.soinside.com 2019 - 2024. All rights reserved.