我想知道以下代码是否可以优化以更快地执行。目前,在一个非常简单的数据流结构上,我目前似乎每秒最多可以收到140万条简单消息。我知道此示例过程同步地传递/转换消息,但是,我目前正在测试TPL Dataflow,以替代基于任务和并发集合的自定义解决方案。我知道“并发”一词已经表明我可以并行运行事物,但是出于当前测试的目的,我通过同步将消息推送到自己的解决方案上,每秒获得约510万条消息。我在这里所缺少的是,我读到TPL Dataflow被推为高吞吐量,低延迟的解决方案,但是到目前为止,我必须忽略性能调整。有人能指出我正确的方向吗?
class TPLDataFlowExperiments
{
public TPLDataFlowExperiments()
{
var buf1 = new BufferBlock<int>();
var transform = new TransformBlock<int, string>(t =>
{
return "";
});
var action = new ActionBlock<string>(s =>
{
//Thread.Sleep(100);
//Console.WriteLine(s);
});
buf1.LinkTo(transform);
transform.LinkTo(action);
//Propagate all Completions down the flow
buf1.Completion.ContinueWith(t =>
{
transform.Complete();
transform.Completion.ContinueWith(u =>
{
action.Complete();
});
});
Stopwatch watch = new Stopwatch();
watch.Start();
int cap = 10000000;
for (int i = 0; i < cap; i++)
{
buf1.Post(i);
}
//Mark Buffer as Complete
buf1.Complete();
action.Completion.ContinueWith(t =>
{
watch.Stop();
Console.WriteLine("All Blocks finished processing");
Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000);
});
Console.ReadLine();
}
}
我认为这主要归结为一件事:您的测试几乎毫无意义。所有这些块都应该做某事,并使用多个内核和异步操作来做到这一点。
而且,在您的测试中,同步上可能花费了大量时间。使用更实际的代码,代码将花费一些时间来执行,因此争用将减少,因此实际开销将小于您所测量的开销。
但是要真正回答您的问题,是的,您忽略了一些性能调整。具体来说,SingleProducerConstrained
,意味着可以使用锁定较少的数据结构。如果我在两个模块上都使用了它(SingleProducerConstrained
在这里完全没用,您可以放心地删除它),则速率将从每秒约3-4百万个项目提高到计算机上的500万个以上。
为了补充svick的答案,测试仅对单个操作块使用单个处理线程。这样,它只测试使用块的开销。
DataFlow以类似于F#代理,Scala actor和MPI实现的方式工作。每个动作块一次执行一个任务,监听输入并产生输出。通过分步执行可在多个内核上独立执行的算法(仅将消息彼此传递)来提供加速。
虽然您可以增加并发任务的数量,但是最重要的问题是设计一种流程,该流程独立于其他步骤执行最大数量的步骤。
您还可以增加数据流块的并行度。如果您发现其中一个块成为其余块的瓶颈,则这可能会提供额外的加速,并且还有助于线性任务之间的负载平衡。
如果您的工作负载非常细微,以至于您希望每秒处理数百万条消息,那么由于相关的开销,通过管道传递单个消息变得不可行。您需要通过将消息批处理到数组或列表中来分块工作量。例如:
BufferBlock
对于批处理输入,您可以使用var transform = new TransformBlock<int[], string[]>(batch =>
{
var results = new string[batch.Length];
for (int i = 0; i < batch.Length; i++)
{
results[i] = ProcessItem(batch[i]);
}
return results;
});
包中的BatchBlock
或“ linqy” BatchBlock
扩展方法,或使用Buffer
包中的功能类似的Buffer
方法,也可以使用手动。