我正试图使用 TPL Dataflow
我可以将消息存储在一个批处理块中,每当它的阈值达到时,它就会将数据发送到一个动作块中.我已经添加了一个缓冲块,以防动作块太慢。到目前为止,我已经尝试了所有可能的方法来将数据从第一个块移动到第二个块,但没有用。我已经链接了这些块,添加了 DataFlowLinkOptions
的 PropagateCompletion
设为 true
. 我还需要做什么才能让这个管道工作?
管线
class LogPipeline<T>
{
private ActionBlock<T[]> actionBlock;
private BufferBlock<T> bufferBlock;
private BatchBlock<T> batchBlock;
private readonly Action<T[]> action;
private readonly int BufferSize;
private readonly int BatchSize;
public LogPipeline(Action<T[]> action, int bufferSize = 4, int batchSize = 2)
{
this.BufferSize = bufferSize;
this.BatchSize = batchSize;
this.action = action;
}
private void Initialize()
{
this.bufferBlock = new BufferBlock<T>(new DataflowBlockOptions
{ TaskScheduler = TaskScheduler.Default,
BoundedCapacity = this.BufferSize });
this.actionBlock = new ActionBlock<T[]>(this.action);
this.batchBlock = new BatchBlock<T>(BatchSize);
this.bufferBlock.LinkTo(this.batchBlock, new DataflowLinkOptions
{ PropagateCompletion = true });
this.batchBlock.LinkTo(this.actionBlock, new DataflowLinkOptions
{ PropagateCompletion = true });
}
public void Post(T log)
{
this.bufferBlock.Post(log);
}
public void Start()
{
this.Initialize();
}
public void Stop()
{
actionBlock.Complete();
}
}
测试
[TestCase(100, 1000, 5)]
public void CanBatchPipelineResults(int batchSize, int bufferSize, int cycles)
{
List<int> data = new List<int>();
LogPipeline<int> logPipeline = new LogPipeline<int>(
batchSize: batchSize,
bufferSize: bufferSize,
action: (logs) =>
{
data.AddRange(logs);
});
logPipeline.Start();
int SelectWithEffect(int element)
{
logPipeline.Post(element);
return 3;
}
int count = 0;
while (true)
{
if (count++ > cycles)
{
break;
}
var sent = Parallel.For(0, bufferSize, (x) => SelectWithEffect(x));
}
logPipeline.Stop();
Assert.IsTrue(data.Count == cycles * batchSize);
}
为什么除了缓冲区之外,我所有的块都是空的?我试过用 SendAsync
也无济于事。无论我怎么做,数据都不会从第一块移动到下一块。
我有带链接和不带链接的选项。
更新 我已经完全删除了管道,还删除了 Parallel
.我尝试了各种输入块(batchbuffertransform),似乎没有办法让后续的块得到什么。我也试过用 await SendAsync
以及 Post
. 我只试过内 unit tests
类。会不会是这个问题?
更新2我是错的复杂的东西 , 我已经尝试了一个更简单的例子 . 在一个testcase里面,即使这样也不工作。
List<int> items=new List<int>();
var tf=new TransformBlock<int,int>(x=>x+1);
var action= new ActionBlock<int>(x=>items.Add(x));
tf.LinkTo(action, new DataFlowOptions{ PropagateCompletion=true});
tf.Post(3);
//Breakpoint here
在测试结束前似乎没有任何事情发生,原因是没有一个块有机会运行。代码 块 所有的CPU通过使用 Parallel.For
所以没有其他任务有机会运行。这意味着所有发布的消息仍然在第一块中。然后代码调用 Complete
的最后一个块,但甚至不等它处理完就检查结果。
这段代码可以简化很多。首先,所有的块都有输入缓冲区,它们不需要额外的缓冲。
管道可以只用这个.Pipeline来代替。
//Arrange
var list=new List<int>();
var head=new BatchBlock<int>(BatchSize);
var act=new ActionBlock<int[]>(nums=>list.AddRange(nums);
var options= new DataflowLinkOptions{ PropagateCompletion = true };
head.LinkTo(act);
//ACT
//Just fire everything at once, because why not
var tasks=Enumerable.Range(0,cycles)(
i=>Task.Run(()=> head.Post(i)));
await tasks;
//Tell the head block we're done
head.Complete();
//Wait for the last block to complete
await act.Completion;
//ASSERT
Assert.Equal(cycles, data.Count);
没有必要创建一个复杂的类来封装流水线。它不会 "启动"--如果没有数据,块就什么都不做。为了抽象它,我们只需要提供对头部块和最后一个块的访问。Completion
任务
通过调用 logPipeline.Stop
的数据后,立即向 BufferBlock
,你正在完成 ActionBlock
因此,它拒绝了所有的信息。BatchBlock
是想以后发给它。从文档中的 ActionBlock.Complete
方法,向数据流块发出信号,表明它不应该接受或产生更多的消息,也不应该消耗更多的延迟消息。
向数据流块发出信号,表明它不应该再接受或产生任何消息,也不应该再消耗任何推迟的消息。
更新了。 关于问题中的更新要求:
每当它的阈值被击中,它就会把数据发送到一个动作块。
....我的建议是将这个逻辑移入到 LogPipeline.Post
方法。该方法 BufferBlock.Post
返回 false
如果该块还没有接受发送到它的数据。
public void Post(T log)
{
if (!this.bufferBlock.Post(log)) this.actionBlock.Post(log);
}